You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2020/04/27 10:00:47 UTC

svn commit: r1877058 - in /jackrabbit/oak/trunk: oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/ oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/ oak-blob-plugins/src/test/java/org/apache/jackrabbi...

Author: amitj
Date: Mon Apr 27 10:00:47 2020
New Revision: 1877058

URL: http://svn.apache.org/viewvc?rev=1877058&view=rev
Log:
OAK-9040: Option to only sweep in BlobGC when all references aged sufficiently

- Added an option for the --collect-garbage to sweep only if the earliest references from all registered repositories is older than max age.

Added:
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java   (with props)
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
    jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
    jackrabbit/oak/trunk/oak-run/README.md
    jackrabbit/oak/trunk/oak-run/pom.xml
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java
    jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java
    jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java

Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Mon Apr 27 10:00:47 2020
@@ -37,6 +37,7 @@ import java.io.InputStreamReader;
 import java.io.LineNumberReader;
 import java.sql.Timestamp;
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -78,6 +79,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.CounterStats;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.apache.jackrabbit.oak.stats.StatsOptions;
@@ -122,6 +124,9 @@ public class MarkSweepGarbageCollector i
     /** Flag to enable low cost consistency check after DSGC */
     private boolean checkConsistencyAfterGc;
 
+    /* Flag to stop sweep if references not old enough */
+    private final boolean sweepIfRefsPastRetention;
+
     /** Helper class to mark blob references which **/
     private final BlobReferenceRetriever marker;
 
@@ -150,6 +155,8 @@ public class MarkSweepGarbageCollector i
 
     private boolean traceOutput;
 
+    private Clock clock;
+
     /**
      * Creates an instance of MarkSweepGarbageCollector
      *
@@ -174,6 +181,7 @@ public class MarkSweepGarbageCollector i
             int batchCount,
             long maxLastModifiedInterval,
             boolean checkConsistencyAfterGc,
+            boolean sweepIfRefsPastRetention,
             @Nullable String repositoryId,
             @Nullable Whiteboard whiteboard,
             @Nullable StatisticsProvider statisticsProvider)
@@ -181,6 +189,7 @@ public class MarkSweepGarbageCollector i
         this.executor = executor;
         this.blobStore = blobStore;
         this.checkConsistencyAfterGc = checkConsistencyAfterGc;
+        this.sweepIfRefsPastRetention = sweepIfRefsPastRetention;
         checkNotNull(blobStore, "BlobStore cannot be null");
         this.marker = marker;
         this.batchCount = batchCount;
@@ -201,6 +210,7 @@ public class MarkSweepGarbageCollector i
         this.consistencyStats =
             new GarbageCollectionOperationStats(statisticsProvider, GarbageCollectionOperationStats.CONSISTENCY_NAME);
         this.consistencyStatsCollector = consistencyStats.getCollector();
+        this.clock = Clock.SIMPLE;
     }
 
     public MarkSweepGarbageCollector(
@@ -212,7 +222,7 @@ public class MarkSweepGarbageCollector i
             long maxLastModifiedInterval,
             @Nullable String repositoryId)
             throws IOException {
-        this(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, false, repositoryId, null, null);
+        this(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, false, false, repositoryId, null, null);
     }
 
     /**
@@ -227,7 +237,7 @@ public class MarkSweepGarbageCollector i
             @Nullable Whiteboard whiteboard,
             @Nullable StatisticsProvider statisticsProvider)
             throws IOException {
-        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, maxLastModifiedInterval, false, repositoryId, whiteboard, statisticsProvider);
+        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, maxLastModifiedInterval, false, false, repositoryId, whiteboard, statisticsProvider);
     }
 
     @Override
@@ -464,7 +474,8 @@ public class MarkSweepGarbageCollector i
         // Merge all the blob references available from all the reference files in the data store meta store
         // Only go ahead if merge succeeded
         earliestRefAvailTime =
-          GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
+          GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs, clock, maxLastModifiedInterval,
+              sweepIfRefsPastRetention);
         LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime);
         earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart);
 
@@ -745,6 +756,10 @@ public class MarkSweepGarbageCollector i
         traceOutput = trace;
     }
 
+    public void setClock(Clock clock) {
+        this.clock = clock;
+    }
+
     /**
      * BlobIdRetriever class to retrieve all blob ids.
      */
@@ -805,13 +820,15 @@ public class MarkSweepGarbageCollector i
              *
              * @param blobStore the blob store
              * @param fs the fs
+             * @param maxLastModifiedInterval
+             * @param sweepIfRefsPastRetention
              * @return the long the earliest time of the available references
              * @throws IOException Signals that an I/O exception has occurred.
              * @throws DataStoreException the data store exception
              */
             @Override
-            long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
-                    GarbageCollectorFileState fs)
+            long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs,
+                Clock clock, long maxLastModifiedInterval, boolean sweepIfRefsPastRetention)
                     throws IOException, DataStoreException {
 
                 List<DataRecord> refFiles =
@@ -824,7 +841,18 @@ public class MarkSweepGarbageCollector i
                 // Retrieve repos for which reference files have not been created
                 Set<String> unAvailRepos =
                         SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles);
-                if (unAvailRepos.isEmpty()) {
+
+                Set<String> notOldRefs = Collections.EMPTY_SET;
+                long retentionTime = clock.getTime() - maxLastModifiedInterval;
+                LOG.info("Retention time calculated [{}]", retentionTime);
+
+                if (sweepIfRefsPastRetention) {
+                    notOldRefs =
+                        SharedDataStoreUtils.refsNotOld(repoFiles, refFiles, retentionTime);
+                    LOG.info("Repositories not having older references than retention time {}", notOldRefs);
+                }
+
+                if (unAvailRepos.isEmpty() && notOldRefs.isEmpty()) {
                     // List of files to be merged
                     List<File> files = newArrayList();
                     for (DataRecord refFile : refFiles) {
@@ -846,7 +874,8 @@ public class MarkSweepGarbageCollector i
 
                     return (earliestMarker < earliestRef ? earliestMarker : earliestRef);
                 } else {
-                    LOG.error("Not all repositories have marked references available : {}", unAvailRepos);
+                    LOG.error("Not all repositories have marked references available : {} or older than retention time: {}",
+                        unAvailRepos, notOldRefs);
                     throw new NotAllRepositoryMarkedException("Not all repositories have marked references available");
                 }
             }
@@ -894,8 +923,8 @@ public class MarkSweepGarbageCollector i
         void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, String repoId,
             String uniqueSuffix) throws DataStoreException, IOException {}
 
-        long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
-                GarbageCollectorFileState fs)
+        long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs,
+            Clock clock, long maxLastModifiedInterval, boolean sweepIfRefsPastRetention)
                 throws IOException, DataStoreException {
             // throw id the marked refs not available.
             if (!fs.getMarkedRefs().exists() || fs.getMarkedRefs().length() == 0) {

Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java Mon Apr 27 10:00:47 2020
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -91,6 +92,34 @@ public class SharedDataStoreUtils {
     }
 
     /**
+     * Repositories from which marked references older than retention time are not available.
+     *
+     * @param repos the repos
+     * @param refs the refs
+     * @param referenceTime the retention time
+     * @return the sets the sets whose references not available
+     */
+    public static Set<String> refsNotOld(List<DataRecord> repos,
+        List<DataRecord> refs, long referenceTime) {
+
+        // Filter records older than the retention time and group by the repository id
+        Set<String> qualifyingRefs = refs.stream()
+            .filter(dataRecord -> dataRecord.getLastModified() < referenceTime)
+            .collect(Collectors
+                .groupingBy(input -> SharedStoreRecordType.MARKED_START_MARKER.getIdFromName(input.getIdentifier().toString()),
+                            Collectors.mapping(java.util.function.Function.identity(), Collectors.toList())))
+            .keySet();
+
+        Set<String> repoIds =
+            repos.stream()
+                .map(dataRecord -> SharedStoreRecordType.REPOSITORY.getIdFromName(dataRecord.getIdentifier().toString()))
+                .collect(Collectors.toSet());
+
+        repoIds.removeAll(qualifyingRefs);
+        return repoIds;
+    }
+
+    /**
      * Encapsulates the different type of records at the data store root.
      */
     public enum SharedStoreRecordType {

Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java Mon Apr 27 10:00:47 2020
@@ -19,7 +19,6 @@
 
 package org.apache.jackrabbit.oak.plugins.blob;
 
-import static org.apache.commons.codec.binary.Hex.encodeHexString;
 import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.CONSISTENCY_NAME;
 import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.FINISH_FAILURE;
 import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.NAME;
@@ -35,45 +34,25 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicReference;
 
-import javax.jcr.RepositoryException;
-
 import ch.qos.logback.classic.Level;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.NullOutputStream;
-import org.apache.jackrabbit.core.data.DataIdentifier;
-import org.apache.jackrabbit.core.data.DataRecord;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.data.MultiDataStoreAware;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -84,14 +63,6 @@ import org.apache.jackrabbit.oak.commons
 import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordAccessProvider;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadOptions;
-import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
-import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -104,8 +75,6 @@ import org.apache.jackrabbit.oak.spi.whi
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -147,7 +116,7 @@ public class BlobGCTest {
             }
         };
 
-        TimeLapsedDataStore dataStore = new TimeLapsedDataStore();
+        TimeLapsedDataStore dataStore = new TimeLapsedDataStore(clock);
         DataStoreBlobStore blobStore = new DataStoreBlobStore(dataStore);
         MemoryBlobStoreNodeStore nodeStore = new MemoryBlobStoreNodeStore(blobStore);
         cluster = new Cluster(folder.newFolder(), blobStore, nodeStore, 0);
@@ -170,6 +139,7 @@ public class BlobGCTest {
     class Cluster implements Closeable {
         protected final BlobStoreState blobStoreState;
         private final File root;
+        private final Clock clock;
         String repoId;
         protected final TimeLapsedDataStore dataStore;
         protected final GarbageCollectableBlobStore blobStore;
@@ -186,6 +156,8 @@ public class BlobGCTest {
             this.nodeStore = nodeStore;
             this.dataStore = (TimeLapsedDataStore) ((DataStoreBlobStore) blobStore).getDataStore();
             this.blobStore = blobStore;
+            this.clock = dataStore.getClock();
+
             if (SharedDataStoreUtils.isShared(blobStore)) {
                 repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
                 ((SharedDataStore) blobStore).setRepositoryId(repoId);
@@ -205,13 +177,16 @@ public class BlobGCTest {
         }
 
         public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs) throws Exception {
-            return getCollector(blobGcMaxAgeInSecs, false);
+            return getCollector(blobGcMaxAgeInSecs, false, false);
         }
 
-        public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs, boolean checkConsistency) throws Exception {
+        public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs, boolean checkConsistency,
+            boolean sweepIfRefsPastRetention) throws Exception {
+
             collector =
                 new MarkSweepGarbageCollector(referenceRetriever, blobStore, executor, root.getAbsolutePath(), 2048,
-                    blobGcMaxAgeInSecs, checkConsistency, repoId, wb, statsProvider);
+                    blobGcMaxAgeInSecs, checkConsistency, sweepIfRefsPastRetention, repoId, wb, statsProvider);
+            collector.setClock(clock);
             return collector;
         }
 
@@ -285,6 +260,63 @@ public class BlobGCTest {
     }
 
     @Test
+    public void sharedGCRefsOld() throws Exception {
+        log.info("Staring sharedGCRefsOld()");
+
+        // Setup a different cluster/repository sharing the blob store
+        MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
+        Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
+        closer.register(secondCluster);
+
+        Sets.SetView<String> totalPresent =
+            Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
+        Sets.SetView<String> totalAdded =
+            Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
+
+        clock.waitUntil(clock.getTime() + 5);
+
+        // Execute mark on the default cluster
+        executeGarbageCollection(cluster, cluster.getCollector(5), true);
+        executeGarbageCollection(secondCluster, secondCluster.getCollector(5), true);
+
+        clock.waitUntil(clock.getTime() + 5);
+
+        Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(5, false, true), false);
+
+        assertTrue(Sets.symmetricDifference(totalPresent, existingAfterGC).isEmpty());
+        assertStats(secondCluster.statsProvider, 2, 0, totalAdded.size() - totalPresent.size(),
+            totalAdded.size() - totalPresent.size(), NAME);
+    }
+
+    @Test
+    public void sharedGCRefsNotOld() throws Exception {
+        log.info("Staring sharedGCRefsNotOld()");
+
+        // Setup a different cluster/repository sharing the blob store
+        MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
+        Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
+        closer.register(secondCluster);
+
+        Sets.SetView<String> totalPresent =
+            Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
+        Sets.SetView<String> totalAdded =
+            Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
+
+        // Execute mark on the default cluster
+        executeGarbageCollection(cluster, cluster.getCollector(5), true);
+
+        // Let the second cluster one not pass retention old time
+        clock.waitUntil(clock.getTime() + 5);
+
+        executeGarbageCollection(secondCluster, secondCluster.getCollector(5), true);
+
+        Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(6, false, true), false);
+
+        assertTrue(Sets.symmetricDifference(totalAdded, existingAfterGC).isEmpty());
+        assertStats(secondCluster.statsProvider, 2, 1, 0,0, NAME);
+    }
+
+    @Test
     public void gc() throws Exception {
         log.info("Starting gc()");
 
@@ -300,7 +332,7 @@ public class BlobGCTest {
         log.info("Starting gcWithConsistencyCheck()");
         ((MemoryBlobStoreNodeStore) cluster.nodeStore).getReferencedBlobs().add("SPURIOUS");
 
-        MarkSweepGarbageCollector collector = cluster.getCollector(0, true);
+        MarkSweepGarbageCollector collector = cluster.getCollector(0, true, false);
         Set<String> existingAfterGC = executeGarbageCollection(cluster, collector, false);
         assertFalse(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
         assertStats(cluster.statsProvider, 1, 0,
@@ -393,7 +425,7 @@ public class BlobGCTest {
         secondCluster.blobStoreState.blobsPresent.add(Iterables.firstOf(cluster.blobStoreState.blobsPresent));
         // Execute mark on the default cluster
         executeGarbageCollection(cluster, cluster.getCollector(0), true);
-        MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true);
+        MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
         long missing = globalCollector.checkConsistency();
         assertEquals(0, missing);
         assertStats(secondCluster.statsProvider, 1, 0, 0, 0, CONSISTENCY_NAME);
@@ -415,7 +447,7 @@ public class BlobGCTest {
 
         // Execute mark on the default cluster
         executeGarbageCollection(cluster, cluster.getCollector(0), true);
-        MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true);
+        MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
         long missing = globalCollector.checkConsistency();
         assertEquals(1, missing);
         assertStats(secondCluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
@@ -437,7 +469,7 @@ public class BlobGCTest {
 
         // Execute mark on the default cluster
         executeGarbageCollection(cluster, cluster.getCollector(0), true);
-        MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true);
+        MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
         long missing = globalCollector.checkConsistency();
         assertEquals(1, missing);
         assertStats(secondCluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
@@ -638,339 +670,4 @@ public class BlobGCTest {
         Set<String> blobsPresent = Sets.newHashSet();
     }
 
-    /**
-     * MemoryNodeStore extension which created blobs in the in-memory blob store
-     */
-    public static class MemoryBlobStoreNodeStore extends MemoryNodeStore {
-        private final BlobStore blobStore;
-        private final boolean fakePath;
-        Set<String> referencedBlobs;
-
-        public MemoryBlobStoreNodeStore(BlobStore blobStore) {
-            this(blobStore, false);
-        }
-
-        public MemoryBlobStoreNodeStore(BlobStore blobStore, boolean fakePath) {
-            this.blobStore = blobStore;
-            this.fakePath = fakePath;
-        }
-
-        public void setReferencedBlobs(Set<String> referencedBlobs) {
-            this.referencedBlobs = referencedBlobs;
-        }
-
-        public Set<String> getReferencedBlobs() {
-            return this.referencedBlobs;
-        }
-
-        @Override
-        public ArrayBasedBlob createBlob(InputStream in) {
-            try {
-                String id = blobStore.writeBlob(in);
-                return new TestBlob(id, blobStore);
-            } catch(Exception e) {
-                log.error("Error in createBlobs", e);
-            }
-            return null;
-        }
-
-        public BlobReferenceRetriever getBlobReferenceRetriever() {
-            return collector -> {
-                for (String id : referencedBlobs) {
-                    collector.addReference(id, (fakePath ? UUID.randomUUID().toString() : null));
-                }
-            };
-        }
-
-        static class TestBlob extends ArrayBasedBlob {
-            private String id;
-            private BlobStore blobStore;
-
-            public TestBlob(String id, BlobStore blobStore) {
-                super(new byte[0]);
-                this.id = id;
-                this.blobStore = blobStore;
-            }
-
-            @Override
-            public String getContentIdentity() {
-                return id;
-            }
-            @NotNull
-            @Override
-            public InputStream getNewStream() {
-                try {
-                    return blobStore.getInputStream(id);
-                } catch (IOException e) {
-                    log.error("Error in getNewStream", e);
-                }
-                return null;
-            }
-
-            @Override
-            public long length() {
-                try {
-                    return blobStore.getBlobLength(id);
-                } catch (IOException e) {
-                    log.error("Error in length", e);
-                }
-                return 0;
-            }
-        }
-    }
-
-    /**
-     * Test in memory DS to store the contents with an increasing time
-     */
-    class TimeLapsedDataStore implements DataStore, MultiDataStoreAware, SharedDataStore, DataRecordAccessProvider {
-        public static final int MIN_RECORD_LENGTH = 50;
-
-        private final long startTime;
-        Map<String, DataRecord> store;
-        Map<String, DataRecord> metadata;
-        Map<String, String> uploadTokens;
-
-        public TimeLapsedDataStore() {
-            this.startTime = clock.getTime();
-            store = Maps.newHashMap();
-            metadata = Maps.newHashMap();
-            uploadTokens = Maps.newHashMap();
-        }
-
-        @Override public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
-            if (store.containsKey(identifier.toString())) {
-                return getRecord(identifier);
-            }
-            return null;
-        }
-
-        @Override public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
-            return store.get(identifier.toString());
-        }
-
-        @Override public DataRecord getRecordFromReference(String reference) throws DataStoreException {
-            return getRecord(new DataIdentifier(reference));
-        }
-
-        @Override public DataRecord addRecord(InputStream stream) throws DataStoreException {
-            try {
-                byte[] data = IOUtils.toByteArray(stream);
-                String id = getIdForInputStream(new ByteArrayInputStream(data));
-                TestRecord rec = new TestRecord(id, data, clock.getTime());
-                store.put(id, rec);
-                log.info("Blob created {} with timestamp {}", rec.id, rec.lastModified);
-                return rec;
-            } catch (Exception e) {
-                throw new DataStoreException(e);
-            }
-
-        }
-
-        @Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
-            return  Iterators.transform(store.keySet().iterator(), input -> new DataIdentifier(input));
-        }
-
-        @Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
-            store.remove(identifier.toString());
-        }
-
-        /***************************************** SharedDataStore ***************************************/
-
-        @Override public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
-            try {
-                byte[] data = IOUtils.toByteArray(stream);
-                TestRecord rec = new TestRecord(name, data, clock.getTime());
-                metadata.put(name, rec);
-                log.info("Metadata created {} with timestamp {}", rec.id, rec.lastModified);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        @Override public void addMetadataRecord(File f, String name) throws DataStoreException {
-            FileInputStream fstream = null;
-            try {
-                fstream = new FileInputStream(f);
-                addMetadataRecord(fstream, name);
-            } catch (Exception e) {
-                e.printStackTrace();
-            } finally {
-                IOUtils.closeQuietly(fstream);
-            }
-        }
-
-        @Override public DataRecord getMetadataRecord(String name) {
-            return metadata.get(name);
-        }
-
-        @Override public boolean metadataRecordExists(String name) {
-            return metadata.containsKey(name);
-        }
-
-        @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
-            List<DataRecord> recs = Lists.newArrayList();
-            Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
-            while (iter.hasNext()) {
-                Map.Entry<String, DataRecord> entry = iter.next();
-                if (entry.getKey().startsWith(prefix)) {
-                    recs.add(entry.getValue());
-                }
-            }
-            return recs;
-        }
-
-        @Override public boolean deleteMetadataRecord(String name) {
-            metadata.remove(name);
-            if (!metadata.containsKey(name)) {
-                return true;
-            }
-            return false;
-        }
-
-        @Override public void deleteAllMetadataRecords(String prefix) {
-            List<String> recs = Lists.newArrayList();
-            Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
-            while (iter.hasNext()) {
-                Map.Entry<String, DataRecord> entry = iter.next();
-                if (entry.getKey().startsWith(prefix)) {
-                    recs.add(entry.getKey());
-                }
-            }
-
-            for(String key: recs) {
-                metadata.remove(key);
-            }
-        }
-
-        @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
-            return store.values().iterator();
-        }
-
-        @Override public DataRecord getRecordForId(DataIdentifier id) throws DataStoreException {
-            return store.get(id.toString());
-        }
-
-        @Override public SharedDataStore.Type getType() {
-            return SharedDataStore.Type.SHARED;
-        }
-
-        /**************************** DataRecordAccessProvider *************************/
-
-        @Override public @Nullable URI getDownloadURI(@NotNull DataIdentifier identifier,
-            @NotNull DataRecordDownloadOptions downloadOptions) {
-            return null;
-        }
-
-        @Override
-        public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
-                throws IllegalArgumentException, DataRecordUploadException {
-            return initiateDataRecordUpload(maxUploadSizeInBytes, maxNumberOfURIs, DataRecordUploadOptions.DEFAULT);
-        }
-
-        @Override
-        public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs, @NotNull final DataRecordUploadOptions options)
-                throws IllegalArgumentException, DataRecordUploadException {
-            String upToken = UUID.randomUUID().toString();
-            Random rand = new Random();
-            InputStream stream = randomStream(rand.nextInt(1000), 100);
-            byte[] data = new byte[0];
-            try {
-                data = IOUtils.toByteArray(stream);
-            } catch (IOException e) {
-                throw new DataRecordUploadException(e);
-            }
-            TestRecord rec = new TestRecord(upToken, data, clock.getTime());
-            store.put(upToken, rec);
-
-            DataRecordUpload uploadRec = new DataRecordUpload() {
-                @Override public @NotNull String getUploadToken() {
-                    return upToken;
-                }
-
-                @Override public long getMinPartSize() {
-                    return maxUploadSizeInBytes;
-                }
-
-                @Override public long getMaxPartSize() {
-                    return maxUploadSizeInBytes;
-                }
-
-                @Override public @NotNull Collection<URI> getUploadURIs() {
-                    return Collections.EMPTY_LIST;
-                }
-            };
-            return uploadRec;
-        }
-
-        @Override public @NotNull DataRecord completeDataRecordUpload(@NotNull String uploadToken)
-            throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
-            return store.get(uploadToken);
-        }
-
-        class TestRecord implements DataRecord {
-            String id;
-            byte[] data;
-            long lastModified;
-
-            public TestRecord(String id, byte[] data, long lastModified) {
-                this.id = id;
-                this.data = data;
-                this.lastModified = lastModified;
-            }
-
-            @Override public DataIdentifier getIdentifier() {
-                return new DataIdentifier(id);
-            }
-
-            @Override public String getReference() {
-                return id;
-            }
-
-            @Override public long getLength() throws DataStoreException {
-                return data.length;
-            }
-
-            @Override public InputStream getStream() throws DataStoreException {
-                return new ByteArrayInputStream(data);
-            }
-
-            @Override public long getLastModified() {
-                return lastModified;
-            }
-        }
-
-        private String getIdForInputStream(final InputStream in)
-            throws Exception {
-            MessageDigest digest = MessageDigest.getInstance("SHA-256");
-            OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
-            try {
-                IOUtils.copyLarge(in, output);
-            } finally {
-                IOUtils.closeQuietly(output);
-                IOUtils.closeQuietly(in);
-            }
-            return encodeHexString(digest.digest());
-        }
-
-        /*************************************** No Op ***********************/
-        @Override public void init(String homeDir) throws RepositoryException {
-        }
-
-        @Override public void updateModifiedDateOnAccess(long before) {
-        }
-
-        @Override public int deleteAllOlderThan(long min) throws DataStoreException {
-            return 0;
-        }
-
-        @Override public int getMinRecordLength() {
-            return MIN_RECORD_LENGTH;
-        }
-
-        @Override public void close() throws DataStoreException {
-        }
-
-        @Override public void clearInUse() {
-        }
-    }
 }

Added: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java?rev=1877058&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java Mon Apr 27 10:00:47 2020
@@ -0,0 +1,92 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * MemoryNodeStore extension which created blobs in the in-memory blob store
+ */
+public class MemoryBlobStoreNodeStore extends MemoryNodeStore {
+    private final BlobStore blobStore;
+    private final boolean fakePath;
+    Set<String> referencedBlobs;
+
+    public MemoryBlobStoreNodeStore(BlobStore blobStore) {
+        this(blobStore, false);
+    }
+
+    public MemoryBlobStoreNodeStore(BlobStore blobStore, boolean fakePath) {
+        this.blobStore = blobStore;
+        this.fakePath = fakePath;
+    }
+
+    public void setReferencedBlobs(Set<String> referencedBlobs) {
+        this.referencedBlobs = referencedBlobs;
+    }
+
+    public Set<String> getReferencedBlobs() {
+        return this.referencedBlobs;
+    }
+
+    @Override
+    public ArrayBasedBlob createBlob(InputStream in) {
+        try {
+            String id = blobStore.writeBlob(in);
+            return new TestBlob(id, blobStore);
+        } catch(Exception e) {
+            BlobGCTest.log.error("Error in createBlobs", e);
+        }
+        return null;
+    }
+
+    public BlobReferenceRetriever getBlobReferenceRetriever() {
+        return collector -> {
+            for (String id : referencedBlobs) {
+                collector.addReference(id, (fakePath ? UUID.randomUUID().toString() : null));
+            }
+        };
+    }
+
+    static class TestBlob extends ArrayBasedBlob {
+        private String id;
+        private BlobStore blobStore;
+
+        public TestBlob(String id, BlobStore blobStore) {
+            super(new byte[0]);
+            this.id = id;
+            this.blobStore = blobStore;
+        }
+
+        @Override
+        public String getContentIdentity() {
+            return id;
+        }
+        @NotNull
+        @Override
+        public InputStream getNewStream() {
+            try {
+                return blobStore.getInputStream(id);
+            } catch (IOException e) {
+                BlobGCTest.log.error("Error in getNewStream", e);
+            }
+            return null;
+        }
+
+        @Override
+        public long length() {
+            try {
+                return blobStore.getBlobLength(id);
+            } catch (IOException e) {
+                BlobGCTest.log.error("Error in length", e);
+            }
+            return 0;
+        }
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java Mon Apr 27 10:00:47 2020
@@ -91,6 +91,7 @@ public class SharedDataStoreMarkSweepGar
       1,
       0L,
       false,
+      false,
       "repo",
       whiteboard,
       new DefaultStatisticsProvider(executor)

Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java Mon Apr 27 10:00:47 2020
@@ -23,12 +23,15 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
@@ -45,6 +48,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -217,6 +221,96 @@ public class SharedDataStoreUtilsTest {
     }
 
     @Test
+    public void testRefsOld() throws Exception {
+        Clock clock = new Clock.Virtual();
+        TimeLapsedDataStore ds = new TimeLapsedDataStore(clock);
+        Data data = new Data();
+
+        String repoId1 = UUID.randomUUID().toString();
+
+        data.repoIds.add(repoId1);
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId1));
+
+        String repoId2 = UUID.randomUUID().toString();
+
+        data.repoIds.add(repoId2);
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId2));
+
+        clock.waitUntil(10);
+        // All the references from registered repositories are available
+        Set<String> repos = SharedDataStoreUtils
+            .refsNotOld(ds.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+                ds.getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()), 5);
+        assertEquals(Collections.EMPTY_SET, repos);
+    }
+
+    @Test
+    public void testRefsNotOldOne() throws Exception {
+        Clock clock = new Clock.Virtual();
+        TimeLapsedDataStore ds = new TimeLapsedDataStore(clock);
+        Data data = new Data();
+
+        String repoId1 = UUID.randomUUID().toString();
+
+        data.repoIds.add(repoId1);
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId1));
+
+        String repoId2 = UUID.randomUUID().toString();
+
+        data.repoIds.add(repoId2);
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId2));
+
+        clock.waitUntil(10);
+        // Only references from first registered repository is available
+        Set<String> repos = SharedDataStoreUtils
+            .refsNotOld(ds.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+                ds.getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()), 3);
+        assertEquals(Stream.of(repoId2).collect(Collectors.toSet()), repos);
+    }
+
+    @Test
+    public void testRefsNotOldAll() throws Exception {
+        Clock clock = new Clock.Virtual();
+        TimeLapsedDataStore ds = new TimeLapsedDataStore(clock);
+        Data data = new Data();
+
+        String repoId1 = UUID.randomUUID().toString();
+
+        data.repoIds.add(repoId1);
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId1));
+
+        String repoId2 = UUID.randomUUID().toString();
+
+        data.repoIds.add(repoId2);
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+        ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+            SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId2));
+
+        clock.waitUntil(5);
+        // none of the references from registered repositories are available
+        Set<String> repos = SharedDataStoreUtils
+            .refsNotOld(ds.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+                ds.getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()), 2);
+        assertEquals(Stream.of(repoId1, repoId2).collect(Collectors.toSet()), repos);
+    }
+
+    @Test
     public void repoMarkerExistOnClose() throws Exception {
         File rootFolder = folder.newFolder();
         dataStore = getBlobStore(rootFolder);

Added: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java?rev=1877058&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java Mon Apr 27 10:00:47 2020
@@ -0,0 +1,303 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jcr.RepositoryException;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadOptions;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+import static org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.randomStream;
+
+/**
+ * Test in memory DS to store the contents with an increasing time
+ */
+public class TimeLapsedDataStore implements DataStore, MultiDataStoreAware, SharedDataStore, DataRecordAccessProvider {
+    public static final int MIN_RECORD_LENGTH = 50;
+
+    private final long startTime;
+    private Clock clock;
+    Map<String, DataRecord> store;
+    Map<String, DataRecord> metadata;
+    Map<String, String> uploadTokens;
+
+    public TimeLapsedDataStore(Clock clock) {
+        this.startTime = clock.getTime();
+        this.clock = clock;
+        store = Maps.newHashMap();
+        metadata = Maps.newHashMap();
+        uploadTokens = Maps.newHashMap();
+    }
+
+    protected Clock getClock() {
+        return clock;
+    }
+
+    @Override public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
+        if (store.containsKey(identifier.toString())) {
+            return getRecord(identifier);
+        }
+        return null;
+    }
+
+    @Override public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
+        return store.get(identifier.toString());
+    }
+
+    @Override public DataRecord getRecordFromReference(String reference) throws DataStoreException {
+        return getRecord(new DataIdentifier(reference));
+    }
+
+    @Override public DataRecord addRecord(InputStream stream) throws DataStoreException {
+        try {
+            byte[] data = IOUtils.toByteArray(stream);
+            String id = getIdForInputStream(new ByteArrayInputStream(data));
+            TestRecord rec = new TestRecord(id, data, clock.getTime());
+            store.put(id, rec);
+            BlobGCTest.log.info("Blob created {} with timestamp {}", rec.id, rec.lastModified);
+            return rec;
+        } catch (Exception e) {
+            throw new DataStoreException(e);
+        }
+
+    }
+
+    @Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
+        return  Iterators.transform(store.keySet().iterator(), input -> new DataIdentifier(input));
+    }
+
+    @Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+        store.remove(identifier.toString());
+    }
+
+    /***************************************** SharedDataStore ***************************************/
+
+    @Override public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+        try {
+            byte[] data = IOUtils.toByteArray(stream);
+            TestRecord rec = new TestRecord(name, data, clock.getTime());
+            metadata.put(name, rec);
+            BlobGCTest.log.info("Metadata created {} with timestamp {}", rec.id, rec.lastModified);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override public void addMetadataRecord(File f, String name) throws DataStoreException {
+        FileInputStream fstream = null;
+        try {
+            fstream = new FileInputStream(f);
+            addMetadataRecord(fstream, name);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            IOUtils.closeQuietly(fstream);
+        }
+    }
+
+    @Override public DataRecord getMetadataRecord(String name) {
+        return metadata.get(name);
+    }
+
+    @Override public boolean metadataRecordExists(String name) {
+        return metadata.containsKey(name);
+    }
+
+    @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
+        List<DataRecord> recs = Lists.newArrayList();
+        Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, DataRecord> entry = iter.next();
+            if (entry.getKey().startsWith(prefix)) {
+                recs.add(entry.getValue());
+            }
+        }
+        return recs;
+    }
+
+    @Override public boolean deleteMetadataRecord(String name) {
+        metadata.remove(name);
+        if (!metadata.containsKey(name)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override public void deleteAllMetadataRecords(String prefix) {
+        List<String> recs = Lists.newArrayList();
+        Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, DataRecord> entry = iter.next();
+            if (entry.getKey().startsWith(prefix)) {
+                recs.add(entry.getKey());
+            }
+        }
+
+        for(String key: recs) {
+            metadata.remove(key);
+        }
+    }
+
+    @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+        return store.values().iterator();
+    }
+
+    @Override public DataRecord getRecordForId(DataIdentifier id) throws DataStoreException {
+        return store.get(id.toString());
+    }
+
+    @Override public Type getType() {
+        return Type.SHARED;
+    }
+
+    /**************************** DataRecordAccessProvider *************************/
+
+    @Override public @Nullable URI getDownloadURI(@NotNull DataIdentifier identifier,
+        @NotNull DataRecordDownloadOptions downloadOptions) {
+        return null;
+    }
+
+    @Override
+    public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
+            throws IllegalArgumentException, DataRecordUploadException {
+        return initiateDataRecordUpload(maxUploadSizeInBytes, maxNumberOfURIs, DataRecordUploadOptions.DEFAULT);
+    }
+
+    @Override
+    public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs, @NotNull final DataRecordUploadOptions options)
+            throws IllegalArgumentException, DataRecordUploadException {
+        String upToken = UUID.randomUUID().toString();
+        Random rand = new Random();
+        InputStream stream = randomStream(rand.nextInt(1000), 100);
+        byte[] data = new byte[0];
+        try {
+            data = IOUtils.toByteArray(stream);
+        } catch (IOException e) {
+            throw new DataRecordUploadException(e);
+        }
+        TestRecord rec = new TestRecord(upToken, data, clock.getTime());
+        store.put(upToken, rec);
+
+        DataRecordUpload uploadRec = new DataRecordUpload() {
+            @Override public @NotNull String getUploadToken() {
+                return upToken;
+            }
+
+            @Override public long getMinPartSize() {
+                return maxUploadSizeInBytes;
+            }
+
+            @Override public long getMaxPartSize() {
+                return maxUploadSizeInBytes;
+            }
+
+            @Override public @NotNull Collection<URI> getUploadURIs() {
+                return Collections.EMPTY_LIST;
+            }
+        };
+        return uploadRec;
+    }
+
+    @Override public @NotNull DataRecord completeDataRecordUpload(@NotNull String uploadToken)
+        throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
+        return store.get(uploadToken);
+    }
+
+    class TestRecord implements DataRecord {
+        String id;
+        byte[] data;
+        long lastModified;
+
+        public TestRecord(String id, byte[] data, long lastModified) {
+            this.id = id;
+            this.data = data;
+            this.lastModified = lastModified;
+        }
+
+        @Override public DataIdentifier getIdentifier() {
+            return new DataIdentifier(id);
+        }
+
+        @Override public String getReference() {
+            return id;
+        }
+
+        @Override public long getLength() throws DataStoreException {
+            return data.length;
+        }
+
+        @Override public InputStream getStream() throws DataStoreException {
+            return new ByteArrayInputStream(data);
+        }
+
+        @Override public long getLastModified() {
+            return lastModified;
+        }
+    }
+
+    private String getIdForInputStream(final InputStream in)
+        throws Exception {
+        MessageDigest digest = MessageDigest.getInstance("SHA-256");
+        OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
+        try {
+            IOUtils.copyLarge(in, output);
+        } finally {
+            IOUtils.closeQuietly(output);
+            IOUtils.closeQuietly(in);
+        }
+        return encodeHexString(digest.digest());
+    }
+
+    /*************************************** No Op ***********************/
+    @Override public void init(String homeDir) throws RepositoryException {
+    }
+
+    @Override public void updateModifiedDateOnAccess(long before) {
+    }
+
+    @Override public int deleteAllOlderThan(long min) throws DataStoreException {
+        return 0;
+    }
+
+    @Override public int getMinRecordLength() {
+        return MIN_RECORD_LENGTH;
+    }
+
+    @Override public void close() throws DataStoreException {
+    }
+
+    @Override public void clearInUse() {
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-run/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/README.md?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/README.md (original)
+++ jackrabbit/oak/trunk/oak-run/README.md Mon Apr 27 10:00:47 2020
@@ -595,7 +595,10 @@ The following options are available:
     --export-metrics         - Option to export the captured metrics. The format of the command is type;URL;key1=value1,key2=value2
                               Currently only [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) is supported
                               e.g. --export-metrics "pushgateway;localhost:9091;key1=value1,key2=value2" 
-
+    --sweep-only-refs-past-retention - Sweep only if the earliest references from all repositories are past the retention period which is govered by the max-age parameter.
+                                       Boolean (Optional). Defaults to False. Only applicable for --collect-garbage
+    --check-consistency-gc    - Performs a consistency check immediately after the GC.        
+                                Boolean (Optional). Defaults to False. Only applicable for --collect-garbage                           
 Note:
 
 Note: When using --export-metrics the following additional jars have to be downloaded to support Prometheus Pushgatway

Modified: jackrabbit/oak/trunk/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/pom.xml?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-run/pom.xml Mon Apr 27 10:00:47 2020
@@ -524,6 +524,13 @@
           <type>test-jar</type>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.jackrabbit</groupId>
+          <artifactId>oak-blob-plugins</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java Mon Apr 27 10:00:47 2020
@@ -352,7 +352,8 @@ public class DataStoreCommand implements
             new MarkSweepGarbageCollector(retriever, (GarbageCollectableBlobStore) fixture.getBlobStore(), service,
                 dataStoreOpts.getOutDir().getAbsolutePath(), dataStoreOpts.getBatchCount(),
                 SECONDS.toMillis(dataStoreOpts.getBlobGcMaxAgeInSecs()), dataStoreOpts.checkConsistencyAfterGC(),
-                repositoryId, fixture.getWhiteboard(), getService(fixture.getWhiteboard(), StatisticsProvider.class));
+                dataStoreOpts.sweepIfRefsPastRetention(), repositoryId, fixture.getWhiteboard(),
+                getService(fixture.getWhiteboard(), StatisticsProvider.class));
         collector.setTraceOutput(true);
 
         return collector;

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java Mon Apr 27 10:00:47 2020
@@ -56,6 +56,7 @@ public class DataStoreOptions implements
     private final OptionSpec<Boolean> resetLoggingConfig;
     private OptionSpec<String> exportMetrics;
     private static final String DELIM = ",";
+    private OptionSpec<Boolean> sweepIfRefsPastRetention;
 
     public DataStoreOptions(OptionParser parser) {
         collectGarbage = parser.accepts("collect-garbage",
@@ -67,6 +68,10 @@ public class DataStoreOptions implements
             "Performs a consistency check immediately after DSGC")
             .withOptionalArg().ofType(Boolean.class).defaultsTo(Boolean.FALSE);
 
+        sweepIfRefsPastRetention = parser.accepts("sweep-only-refs-past-retention",
+            "Only allows sweep if all references available older than retention time (Default false)")
+            .withOptionalArg().ofType(Boolean.class).defaultsTo(Boolean.FALSE);
+
         consistencyCheck =
             parser.accepts("check-consistency", "Performs a consistency check on the repository/datastore defined");
 
@@ -233,4 +238,7 @@ public class DataStoreOptions implements
         return options.valuesOf(verbosePathInclusionRegex);
     }
 
+    public boolean sweepIfRefsPastRetention() {
+        return options.has(sweepIfRefsPastRetention) && sweepIfRefsPastRetention.value(options) ;
+    }
 }

Modified: jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java Mon Apr 27 10:00:47 2020
@@ -32,7 +32,7 @@ import com.google.common.collect.Sets;
 import org.apache.jackrabbit.core.data.DataRecord;
 import org.apache.jackrabbit.core.data.DataStoreException;
 import org.apache.jackrabbit.oak.commons.FileIOUtils;
-import org.apache.jackrabbit.oak.plugins.blob.BlobGCTest.MemoryBlobStoreNodeStore;
+import org.apache.jackrabbit.oak.plugins.blob.MemoryBlobStoreNodeStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.run.DataStoreCommandTest.DataStoreFixture;
 import org.apache.jackrabbit.oak.run.DataStoreCommandTest.StoreFixture;

Modified: jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java Mon Apr 27 10:00:47 2020
@@ -58,7 +58,7 @@ import org.apache.jackrabbit.oak.blob.cl
 import org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils;
 import org.apache.jackrabbit.oak.commons.FileIOUtils;
 import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.plugins.blob.BlobGCTest.MemoryBlobStoreNodeStore;
+import org.apache.jackrabbit.oak.plugins.blob.MemoryBlobStoreNodeStore;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
@@ -527,7 +527,7 @@ public class DataStoreCommandTest {
         Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
         storeFixture.close();
         additionalParams += " --check-consistency-gc true";
-        testGc(dump, data, 0, false);
+        testGc(dump, data, 0, false, false);
 
         assertFileEquals(dump, "avail-", Sets.difference(data.added, data.missingDataStore));
 
@@ -536,12 +536,30 @@ public class DataStoreCommandTest {
     }
 
     @Test
+    public void gcSweepRefsOld() throws Exception {
+        File dump = temporaryFolder.newFolder();
+        Data data = prepareData(storeFixture, blobFixture, 10, 5, 0);
+        storeFixture.close();
+        additionalParams += " --check-consistency-gc true --sweep-only-refs-past-retention true";
+        testGc(dump, data, 0, false, false);
+    }
+
+    @Test
+    public void gcSweepRefsNotOld() throws Exception {
+        File dump = temporaryFolder.newFolder();
+        Data data = prepareData(storeFixture, blobFixture, 10, 5, 0);
+        storeFixture.close();
+        additionalParams += " --check-consistency-gc true --sweep-only-refs-past-retention true";
+        testGc(dump, data, 1000, false, true);
+    }
+
+    @Test
     public void gc() throws Exception {
         File dump = temporaryFolder.newFolder();
         Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
         storeFixture.close();
 
-        testGc(dump, data, 0, false);
+        testGc(dump, data, 0, false, false);
     }
     /*
     Command should throw and exception if --verboseRootPath specified
@@ -555,7 +573,7 @@ public class DataStoreCommandTest {
 
         additionalParams += " --verboseRootPath /a";
         try {
-            testGc(dump, data, 0, false);
+            testGc(dump, data, 0, false, false);
         } catch (OptionException e) {
             assertTrue(e.getMessage().equals("Option(s) [verboseRootPath] are unavailable " +
                     "given other options on the command line"));
@@ -570,7 +588,7 @@ public class DataStoreCommandTest {
         Data data = prepareData(storeFixture, blobFixture, 10, 0, 1);
         storeFixture.close();
 
-        testGc(dump, data, 0, false);
+        testGc(dump, data, 0, false, false);
     }
 
     @Test
@@ -579,7 +597,7 @@ public class DataStoreCommandTest {
         Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
         storeFixture.close();
 
-        testGc(dump, data, 10000, false);
+        testGc(dump, data, 10000, false, false);
     }
 
     @Test
@@ -588,7 +606,7 @@ public class DataStoreCommandTest {
         Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
         storeFixture.close();
 
-        testGc(dump, data, 10000, true);
+        testGc(dump, data, 10000, true, false);
     }
 
     @Test
@@ -607,7 +625,7 @@ public class DataStoreCommandTest {
 
         storeFixture.close();
 
-        testGc(dump, data, 0, false);
+        testGc(dump, data, 0, false, false);
     }
 
     @Test
@@ -744,7 +762,7 @@ public class DataStoreCommandTest {
 
     }
 
-    private void testGc(File dump, Data data, long maxAge, boolean markOnly) throws Exception {
+    private void testGc(File dump, Data data, long maxAge, boolean markOnly, boolean refsOld) throws Exception {
         List<String> argsList = Lists
             .newArrayList("--collect-garbage", String.valueOf(markOnly), "--max-age", String.valueOf(maxAge),
                 "--" + getOption(blobFixture.getType()), blobFixture.getConfigPath(),
@@ -757,14 +775,14 @@ public class DataStoreCommandTest {
         DataStoreCommand cmd = new DataStoreCommand();
         cmd.execute(argsList.toArray(new String[0]));
 
-        if (!markOnly) {
+        if (!markOnly && !refsOld) {
             assertFileEquals(dump, "avail-", Sets.difference(data.added, data.missingDataStore));
         } else {
             assertFileNull(dump, "avail-");
         }
 
         assertFileEquals(dump, "marked-", Sets.difference(data.added, data.deleted));
-        if (!markOnly) {
+        if (!markOnly && !refsOld) {
             assertFileEquals(dump, "gccand-", data.deleted);
         } else {
             assertFileNull(dump, "gccand-");