You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/09/25 13:11:03 UTC

svn commit: r1705273 - /jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java

Author: amitj
Date: Fri Sep 25 11:11:01 2015
New Revision: 1705273

URL: http://svn.apache.org/viewvc?rev=1705273&view=rev
Log:
OAK-3443: Track the start time of mark in GC

Test case for SegmentMK which simulates the condition by delaying the blob identification phase and adds additional blobs which should not be collected

Modified:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java?rev=1705273&r1=1705272&r2=1705273&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCIT.java Fri Sep 25 11:11:01 2015
@@ -37,18 +37,24 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import com.google.common.io.Closeables;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -266,6 +272,31 @@ public class SegmentDataStoreBlobGCIT {
         assertEquals(count, candidates);
     }
     
+    @Test
+    public void gcLongRunningBlobCollection() throws Exception {
+        DataStoreState state = setUp();
+        log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+        log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+        
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+        String repoId = null;
+        if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+            repoId = ClusterRepositoryInfo.createId(nodeStore);
+            ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+                new ByteArrayInputStream(new byte[0]),
+                REPOSITORY.getNameFromId(repoId));
+        }
+        TestGarbageCollector gc = new TestGarbageCollector(
+            new SegmentBlobReferenceRetriever(store.getTracker()),
+            (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+        gc.collectGarbage(false);
+        Set<String> existingAfterGC = iterate();
+        log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+        
+        assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+        assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+    }
+    
     private Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
         ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gc = init(maxBlobGcInSecs, executor);
@@ -318,5 +349,81 @@ public class SegmentDataStoreBlobGCIT {
         r.nextBytes(data);
         return new ByteArrayInputStream(data);
     }
+    
+    /**
+    * Waits for some time and adds additional blobs after blob referenced identified to simulate
+    * long running blob id collection phase.
+     */
+    class TestGarbageCollector extends MarkSweepGarbageCollector {
+        long maxLastModifiedInterval;
+        String root;
+        GarbageCollectableBlobStore blobStore;
+        Set<String> additionalBlobs;
+        
+        public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+                                    Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+                                    @Nullable String repositoryId) throws IOException {
+            super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+            this.root = root;
+            this.blobStore = blobStore;
+            this.maxLastModifiedInterval = maxLastModifiedInterval;
+            this.additionalBlobs = Sets.newHashSet();
+        }
+        
+        @Override
+        protected void markAndSweep(boolean markOnly) throws Exception {
+            boolean threw = true;
+            GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+            try {
+                Stopwatch sw = Stopwatch.createStarted();
+                LOG.info("Starting Test Blob garbage collection");
+                
+                // Sleep a little more than the max interval to get over the interval for valid blobs
+                Thread.sleep(maxLastModifiedInterval + 100);
+                LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+                
+                long markStart = System.currentTimeMillis();
+                mark(fs);
+                LOG.info("Mark finished");
+                
+                additionalBlobs = createAdditional();
+                
+                if (!markOnly) {
+                    Thread.sleep(maxLastModifiedInterval + 100);
+                    LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+                    
+                    long deleteCount = sweep(fs, markStart);
+                    threw = false;
+                    
+                    LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+                        deleteCount, maxLastModifiedInterval);
+                }
+            } finally {
+                if (!LOG.isTraceEnabled()) {
+                    Closeables.close(fs, threw);
+                }
+            }
+        }
+        
+        public HashSet<String> createAdditional() throws Exception {
+            HashSet<String> blobSet = new HashSet<String>();
+            NodeBuilder a = nodeStore.getRoot().builder();
+            int number = 5;
+            for (int i = 0; i < number; i++) {
+                SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(100 + i, 16516));
+                a.child("cafter" + i).setProperty("x", b);
+                Iterator<String> idIter =
+                    ((GarbageCollectableBlobStore) blobStore).resolveChunks(b.getBlobId());
+                while (idIter.hasNext()) {
+                    String chunk = idIter.next();
+                    blobSet.add(chunk);
+                }
+            }
+            log.info("{} Additional created {}", blobSet.size(), blobSet);
+            
+            nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            return blobSet;
+        }
+    }
 }