You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2020/04/27 10:00:47 UTC
svn commit: r1877058 - in /jackrabbit/oak/trunk:
oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/
oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/
oak-blob-plugins/src/test/java/org/apache/jackrabbi...
Author: amitj
Date: Mon Apr 27 10:00:47 2020
New Revision: 1877058
URL: http://svn.apache.org/viewvc?rev=1877058&view=rev
Log:
OAK-9040: Option to only sweep in BlobGC when all references aged sufficiently
- Added an option for the --collect-garbage to sweep only if the earliest references from all registered repositories is older than max age.
Added:
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java (with props)
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java (with props)
Modified:
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
jackrabbit/oak/trunk/oak-run/README.md
jackrabbit/oak/trunk/oak-run/pom.xml
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java
jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Mon Apr 27 10:00:47 2020
@@ -37,6 +37,7 @@ import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.sql.Timestamp;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -78,6 +79,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatsOptions;
@@ -122,6 +124,9 @@ public class MarkSweepGarbageCollector i
/** Flag to enable low cost consistency check after DSGC */
private boolean checkConsistencyAfterGc;
+ /* Flag to stop sweep if references not old enough */
+ private final boolean sweepIfRefsPastRetention;
+
/** Helper class to mark blob references which **/
private final BlobReferenceRetriever marker;
@@ -150,6 +155,8 @@ public class MarkSweepGarbageCollector i
private boolean traceOutput;
+ private Clock clock;
+
/**
* Creates an instance of MarkSweepGarbageCollector
*
@@ -174,6 +181,7 @@ public class MarkSweepGarbageCollector i
int batchCount,
long maxLastModifiedInterval,
boolean checkConsistencyAfterGc,
+ boolean sweepIfRefsPastRetention,
@Nullable String repositoryId,
@Nullable Whiteboard whiteboard,
@Nullable StatisticsProvider statisticsProvider)
@@ -181,6 +189,7 @@ public class MarkSweepGarbageCollector i
this.executor = executor;
this.blobStore = blobStore;
this.checkConsistencyAfterGc = checkConsistencyAfterGc;
+ this.sweepIfRefsPastRetention = sweepIfRefsPastRetention;
checkNotNull(blobStore, "BlobStore cannot be null");
this.marker = marker;
this.batchCount = batchCount;
@@ -201,6 +210,7 @@ public class MarkSweepGarbageCollector i
this.consistencyStats =
new GarbageCollectionOperationStats(statisticsProvider, GarbageCollectionOperationStats.CONSISTENCY_NAME);
this.consistencyStatsCollector = consistencyStats.getCollector();
+ this.clock = Clock.SIMPLE;
}
public MarkSweepGarbageCollector(
@@ -212,7 +222,7 @@ public class MarkSweepGarbageCollector i
long maxLastModifiedInterval,
@Nullable String repositoryId)
throws IOException {
- this(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, false, repositoryId, null, null);
+ this(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, false, false, repositoryId, null, null);
}
/**
@@ -227,7 +237,7 @@ public class MarkSweepGarbageCollector i
@Nullable Whiteboard whiteboard,
@Nullable StatisticsProvider statisticsProvider)
throws IOException {
- this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, maxLastModifiedInterval, false, repositoryId, whiteboard, statisticsProvider);
+ this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, maxLastModifiedInterval, false, false, repositoryId, whiteboard, statisticsProvider);
}
@Override
@@ -464,7 +474,8 @@ public class MarkSweepGarbageCollector i
// Merge all the blob references available from all the reference files in the data store meta store
// Only go ahead if merge succeeded
earliestRefAvailTime =
- GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
+ GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs, clock, maxLastModifiedInterval,
+ sweepIfRefsPastRetention);
LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime);
earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart);
@@ -745,6 +756,10 @@ public class MarkSweepGarbageCollector i
traceOutput = trace;
}
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
/**
* BlobIdRetriever class to retrieve all blob ids.
*/
@@ -805,13 +820,15 @@ public class MarkSweepGarbageCollector i
*
* @param blobStore the blob store
* @param fs the fs
+ * @param maxLastModifiedInterval
+ * @param sweepIfRefsPastRetention
* @return the long the earliest time of the available references
* @throws IOException Signals that an I/O exception has occurred.
* @throws DataStoreException the data store exception
*/
@Override
- long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
- GarbageCollectorFileState fs)
+ long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs,
+ Clock clock, long maxLastModifiedInterval, boolean sweepIfRefsPastRetention)
throws IOException, DataStoreException {
List<DataRecord> refFiles =
@@ -824,7 +841,18 @@ public class MarkSweepGarbageCollector i
// Retrieve repos for which reference files have not been created
Set<String> unAvailRepos =
SharedDataStoreUtils.refsNotAvailableFromRepos(repoFiles, refFiles);
- if (unAvailRepos.isEmpty()) {
+
+ Set<String> notOldRefs = Collections.EMPTY_SET;
+ long retentionTime = clock.getTime() - maxLastModifiedInterval;
+ LOG.info("Retention time calculated [{}]", retentionTime);
+
+ if (sweepIfRefsPastRetention) {
+ notOldRefs =
+ SharedDataStoreUtils.refsNotOld(repoFiles, refFiles, retentionTime);
+ LOG.info("Repositories not having older references than retention time {}", notOldRefs);
+ }
+
+ if (unAvailRepos.isEmpty() && notOldRefs.isEmpty()) {
// List of files to be merged
List<File> files = newArrayList();
for (DataRecord refFile : refFiles) {
@@ -846,7 +874,8 @@ public class MarkSweepGarbageCollector i
return (earliestMarker < earliestRef ? earliestMarker : earliestRef);
} else {
- LOG.error("Not all repositories have marked references available : {}", unAvailRepos);
+ LOG.error("Not all repositories have marked references available : {} or older than retention time: {}",
+ unAvailRepos, notOldRefs);
throw new NotAllRepositoryMarkedException("Not all repositories have marked references available");
}
}
@@ -894,8 +923,8 @@ public class MarkSweepGarbageCollector i
void addMarked(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs, String repoId,
String uniqueSuffix) throws DataStoreException, IOException {}
- long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore,
- GarbageCollectorFileState fs)
+ long mergeAllMarkedReferences(GarbageCollectableBlobStore blobStore, GarbageCollectorFileState fs,
+ Clock clock, long maxLastModifiedInterval, boolean sweepIfRefsPastRetention)
throws IOException, DataStoreException {
// throw id the marked refs not available.
if (!fs.getMarkedRefs().exists() || fs.getMarkedRefs().length() == 0) {
Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/SharedDataStoreUtils.java Mon Apr 27 10:00:47 2020
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -91,6 +92,34 @@ public class SharedDataStoreUtils {
}
/**
+ * Repositories from which marked references older than retention time are not available.
+ *
+ * @param repos the repos
+ * @param refs the refs
+ * @param referenceTime the retention time
+ * @return the sets the sets whose references not available
+ */
+ public static Set<String> refsNotOld(List<DataRecord> repos,
+ List<DataRecord> refs, long referenceTime) {
+
+ // Filter records older than the retention time and group by the repository id
+ Set<String> qualifyingRefs = refs.stream()
+ .filter(dataRecord -> dataRecord.getLastModified() < referenceTime)
+ .collect(Collectors
+ .groupingBy(input -> SharedStoreRecordType.MARKED_START_MARKER.getIdFromName(input.getIdentifier().toString()),
+ Collectors.mapping(java.util.function.Function.identity(), Collectors.toList())))
+ .keySet();
+
+ Set<String> repoIds =
+ repos.stream()
+ .map(dataRecord -> SharedStoreRecordType.REPOSITORY.getIdFromName(dataRecord.getIdentifier().toString()))
+ .collect(Collectors.toSet());
+
+ repoIds.removeAll(qualifyingRefs);
+ return repoIds;
+ }
+
+ /**
* Encapsulates the different type of records at the data store root.
*/
public enum SharedStoreRecordType {
Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/BlobGCTest.java Mon Apr 27 10:00:47 2020
@@ -19,7 +19,6 @@
package org.apache.jackrabbit.oak.plugins.blob;
-import static org.apache.commons.codec.binary.Hex.encodeHexString;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.CONSISTENCY_NAME;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.FINISH_FAILURE;
import static org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector.GarbageCollectionOperationStats.NAME;
@@ -35,45 +34,25 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
-import javax.jcr.RepositoryException;
-
import ch.qos.logback.classic.Level;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.output.NullOutputStream;
-import org.apache.jackrabbit.core.data.DataIdentifier;
-import org.apache.jackrabbit.core.data.DataRecord;
-import org.apache.jackrabbit.core.data.DataStore;
-import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.core.data.MultiDataStoreAware;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -84,14 +63,6 @@ import org.apache.jackrabbit.oak.commons
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordAccessProvider;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
-import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadOptions;
-import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
-import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
-import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -104,8 +75,6 @@ import org.apache.jackrabbit.oak.spi.whi
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -147,7 +116,7 @@ public class BlobGCTest {
}
};
- TimeLapsedDataStore dataStore = new TimeLapsedDataStore();
+ TimeLapsedDataStore dataStore = new TimeLapsedDataStore(clock);
DataStoreBlobStore blobStore = new DataStoreBlobStore(dataStore);
MemoryBlobStoreNodeStore nodeStore = new MemoryBlobStoreNodeStore(blobStore);
cluster = new Cluster(folder.newFolder(), blobStore, nodeStore, 0);
@@ -170,6 +139,7 @@ public class BlobGCTest {
class Cluster implements Closeable {
protected final BlobStoreState blobStoreState;
private final File root;
+ private final Clock clock;
String repoId;
protected final TimeLapsedDataStore dataStore;
protected final GarbageCollectableBlobStore blobStore;
@@ -186,6 +156,8 @@ public class BlobGCTest {
this.nodeStore = nodeStore;
this.dataStore = (TimeLapsedDataStore) ((DataStoreBlobStore) blobStore).getDataStore();
this.blobStore = blobStore;
+ this.clock = dataStore.getClock();
+
if (SharedDataStoreUtils.isShared(blobStore)) {
repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
((SharedDataStore) blobStore).setRepositoryId(repoId);
@@ -205,13 +177,16 @@ public class BlobGCTest {
}
public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs) throws Exception {
- return getCollector(blobGcMaxAgeInSecs, false);
+ return getCollector(blobGcMaxAgeInSecs, false, false);
}
- public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs, boolean checkConsistency) throws Exception {
+ public MarkSweepGarbageCollector getCollector(long blobGcMaxAgeInSecs, boolean checkConsistency,
+ boolean sweepIfRefsPastRetention) throws Exception {
+
collector =
new MarkSweepGarbageCollector(referenceRetriever, blobStore, executor, root.getAbsolutePath(), 2048,
- blobGcMaxAgeInSecs, checkConsistency, repoId, wb, statsProvider);
+ blobGcMaxAgeInSecs, checkConsistency, sweepIfRefsPastRetention, repoId, wb, statsProvider);
+ collector.setClock(clock);
return collector;
}
@@ -285,6 +260,63 @@ public class BlobGCTest {
}
@Test
+ public void sharedGCRefsOld() throws Exception {
+ log.info("Staring sharedGCRefsOld()");
+
+ // Setup a different cluster/repository sharing the blob store
+ MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
+ Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
+ closer.register(secondCluster);
+
+ Sets.SetView<String> totalPresent =
+ Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
+ Sets.SetView<String> totalAdded =
+ Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
+
+ clock.waitUntil(clock.getTime() + 5);
+
+ // Execute mark on the default cluster
+ executeGarbageCollection(cluster, cluster.getCollector(5), true);
+ executeGarbageCollection(secondCluster, secondCluster.getCollector(5), true);
+
+ clock.waitUntil(clock.getTime() + 5);
+
+ Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(5, false, true), false);
+
+ assertTrue(Sets.symmetricDifference(totalPresent, existingAfterGC).isEmpty());
+ assertStats(secondCluster.statsProvider, 2, 0, totalAdded.size() - totalPresent.size(),
+ totalAdded.size() - totalPresent.size(), NAME);
+ }
+
+ @Test
+ public void sharedGCRefsNotOld() throws Exception {
+ log.info("Staring sharedGCRefsNotOld()");
+
+ // Setup a different cluster/repository sharing the blob store
+ MemoryBlobStoreNodeStore secondClusterNodeStore = new MemoryBlobStoreNodeStore(cluster.blobStore);
+ Cluster secondCluster = new Cluster(folder.newFolder(), cluster.blobStore, secondClusterNodeStore, 100);
+ closer.register(secondCluster);
+
+ Sets.SetView<String> totalPresent =
+ Sets.union(cluster.blobStoreState.blobsPresent, secondCluster.blobStoreState.blobsPresent);
+ Sets.SetView<String> totalAdded =
+ Sets.union(cluster.blobStoreState.blobsAdded, secondCluster.blobStoreState.blobsAdded);
+
+ // Execute mark on the default cluster
+ executeGarbageCollection(cluster, cluster.getCollector(5), true);
+
+ // Let the second cluster one not pass retention old time
+ clock.waitUntil(clock.getTime() + 5);
+
+ executeGarbageCollection(secondCluster, secondCluster.getCollector(5), true);
+
+ Set<String> existingAfterGC = executeGarbageCollection(secondCluster, secondCluster.getCollector(6, false, true), false);
+
+ assertTrue(Sets.symmetricDifference(totalAdded, existingAfterGC).isEmpty());
+ assertStats(secondCluster.statsProvider, 2, 1, 0,0, NAME);
+ }
+
+ @Test
public void gc() throws Exception {
log.info("Starting gc()");
@@ -300,7 +332,7 @@ public class BlobGCTest {
log.info("Starting gcWithConsistencyCheck()");
((MemoryBlobStoreNodeStore) cluster.nodeStore).getReferencedBlobs().add("SPURIOUS");
- MarkSweepGarbageCollector collector = cluster.getCollector(0, true);
+ MarkSweepGarbageCollector collector = cluster.getCollector(0, true, false);
Set<String> existingAfterGC = executeGarbageCollection(cluster, collector, false);
assertFalse(Sets.symmetricDifference(cluster.blobStoreState.blobsPresent, existingAfterGC).isEmpty());
assertStats(cluster.statsProvider, 1, 0,
@@ -393,7 +425,7 @@ public class BlobGCTest {
secondCluster.blobStoreState.blobsPresent.add(Iterables.firstOf(cluster.blobStoreState.blobsPresent));
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
- MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true);
+ MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
long missing = globalCollector.checkConsistency();
assertEquals(0, missing);
assertStats(secondCluster.statsProvider, 1, 0, 0, 0, CONSISTENCY_NAME);
@@ -415,7 +447,7 @@ public class BlobGCTest {
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
- MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true);
+ MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
long missing = globalCollector.checkConsistency();
assertEquals(1, missing);
assertStats(secondCluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
@@ -437,7 +469,7 @@ public class BlobGCTest {
// Execute mark on the default cluster
executeGarbageCollection(cluster, cluster.getCollector(0), true);
- MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true);
+ MarkSweepGarbageCollector globalCollector = secondCluster.getCollector(0, true, false);
long missing = globalCollector.checkConsistency();
assertEquals(1, missing);
assertStats(secondCluster.statsProvider, 1, 1, 1, 0, CONSISTENCY_NAME);
@@ -638,339 +670,4 @@ public class BlobGCTest {
Set<String> blobsPresent = Sets.newHashSet();
}
- /**
- * MemoryNodeStore extension which created blobs in the in-memory blob store
- */
- public static class MemoryBlobStoreNodeStore extends MemoryNodeStore {
- private final BlobStore blobStore;
- private final boolean fakePath;
- Set<String> referencedBlobs;
-
- public MemoryBlobStoreNodeStore(BlobStore blobStore) {
- this(blobStore, false);
- }
-
- public MemoryBlobStoreNodeStore(BlobStore blobStore, boolean fakePath) {
- this.blobStore = blobStore;
- this.fakePath = fakePath;
- }
-
- public void setReferencedBlobs(Set<String> referencedBlobs) {
- this.referencedBlobs = referencedBlobs;
- }
-
- public Set<String> getReferencedBlobs() {
- return this.referencedBlobs;
- }
-
- @Override
- public ArrayBasedBlob createBlob(InputStream in) {
- try {
- String id = blobStore.writeBlob(in);
- return new TestBlob(id, blobStore);
- } catch(Exception e) {
- log.error("Error in createBlobs", e);
- }
- return null;
- }
-
- public BlobReferenceRetriever getBlobReferenceRetriever() {
- return collector -> {
- for (String id : referencedBlobs) {
- collector.addReference(id, (fakePath ? UUID.randomUUID().toString() : null));
- }
- };
- }
-
- static class TestBlob extends ArrayBasedBlob {
- private String id;
- private BlobStore blobStore;
-
- public TestBlob(String id, BlobStore blobStore) {
- super(new byte[0]);
- this.id = id;
- this.blobStore = blobStore;
- }
-
- @Override
- public String getContentIdentity() {
- return id;
- }
- @NotNull
- @Override
- public InputStream getNewStream() {
- try {
- return blobStore.getInputStream(id);
- } catch (IOException e) {
- log.error("Error in getNewStream", e);
- }
- return null;
- }
-
- @Override
- public long length() {
- try {
- return blobStore.getBlobLength(id);
- } catch (IOException e) {
- log.error("Error in length", e);
- }
- return 0;
- }
- }
- }
-
- /**
- * Test in memory DS to store the contents with an increasing time
- */
- class TimeLapsedDataStore implements DataStore, MultiDataStoreAware, SharedDataStore, DataRecordAccessProvider {
- public static final int MIN_RECORD_LENGTH = 50;
-
- private final long startTime;
- Map<String, DataRecord> store;
- Map<String, DataRecord> metadata;
- Map<String, String> uploadTokens;
-
- public TimeLapsedDataStore() {
- this.startTime = clock.getTime();
- store = Maps.newHashMap();
- metadata = Maps.newHashMap();
- uploadTokens = Maps.newHashMap();
- }
-
- @Override public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
- if (store.containsKey(identifier.toString())) {
- return getRecord(identifier);
- }
- return null;
- }
-
- @Override public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
- return store.get(identifier.toString());
- }
-
- @Override public DataRecord getRecordFromReference(String reference) throws DataStoreException {
- return getRecord(new DataIdentifier(reference));
- }
-
- @Override public DataRecord addRecord(InputStream stream) throws DataStoreException {
- try {
- byte[] data = IOUtils.toByteArray(stream);
- String id = getIdForInputStream(new ByteArrayInputStream(data));
- TestRecord rec = new TestRecord(id, data, clock.getTime());
- store.put(id, rec);
- log.info("Blob created {} with timestamp {}", rec.id, rec.lastModified);
- return rec;
- } catch (Exception e) {
- throw new DataStoreException(e);
- }
-
- }
-
- @Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
- return Iterators.transform(store.keySet().iterator(), input -> new DataIdentifier(input));
- }
-
- @Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
- store.remove(identifier.toString());
- }
-
- /***************************************** SharedDataStore ***************************************/
-
- @Override public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
- try {
- byte[] data = IOUtils.toByteArray(stream);
- TestRecord rec = new TestRecord(name, data, clock.getTime());
- metadata.put(name, rec);
- log.info("Metadata created {} with timestamp {}", rec.id, rec.lastModified);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override public void addMetadataRecord(File f, String name) throws DataStoreException {
- FileInputStream fstream = null;
- try {
- fstream = new FileInputStream(f);
- addMetadataRecord(fstream, name);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeQuietly(fstream);
- }
- }
-
- @Override public DataRecord getMetadataRecord(String name) {
- return metadata.get(name);
- }
-
- @Override public boolean metadataRecordExists(String name) {
- return metadata.containsKey(name);
- }
-
- @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
- List<DataRecord> recs = Lists.newArrayList();
- Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, DataRecord> entry = iter.next();
- if (entry.getKey().startsWith(prefix)) {
- recs.add(entry.getValue());
- }
- }
- return recs;
- }
-
- @Override public boolean deleteMetadataRecord(String name) {
- metadata.remove(name);
- if (!metadata.containsKey(name)) {
- return true;
- }
- return false;
- }
-
- @Override public void deleteAllMetadataRecords(String prefix) {
- List<String> recs = Lists.newArrayList();
- Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, DataRecord> entry = iter.next();
- if (entry.getKey().startsWith(prefix)) {
- recs.add(entry.getKey());
- }
- }
-
- for(String key: recs) {
- metadata.remove(key);
- }
- }
-
- @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
- return store.values().iterator();
- }
-
- @Override public DataRecord getRecordForId(DataIdentifier id) throws DataStoreException {
- return store.get(id.toString());
- }
-
- @Override public SharedDataStore.Type getType() {
- return SharedDataStore.Type.SHARED;
- }
-
- /**************************** DataRecordAccessProvider *************************/
-
- @Override public @Nullable URI getDownloadURI(@NotNull DataIdentifier identifier,
- @NotNull DataRecordDownloadOptions downloadOptions) {
- return null;
- }
-
- @Override
- public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
- throws IllegalArgumentException, DataRecordUploadException {
- return initiateDataRecordUpload(maxUploadSizeInBytes, maxNumberOfURIs, DataRecordUploadOptions.DEFAULT);
- }
-
- @Override
- public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs, @NotNull final DataRecordUploadOptions options)
- throws IllegalArgumentException, DataRecordUploadException {
- String upToken = UUID.randomUUID().toString();
- Random rand = new Random();
- InputStream stream = randomStream(rand.nextInt(1000), 100);
- byte[] data = new byte[0];
- try {
- data = IOUtils.toByteArray(stream);
- } catch (IOException e) {
- throw new DataRecordUploadException(e);
- }
- TestRecord rec = new TestRecord(upToken, data, clock.getTime());
- store.put(upToken, rec);
-
- DataRecordUpload uploadRec = new DataRecordUpload() {
- @Override public @NotNull String getUploadToken() {
- return upToken;
- }
-
- @Override public long getMinPartSize() {
- return maxUploadSizeInBytes;
- }
-
- @Override public long getMaxPartSize() {
- return maxUploadSizeInBytes;
- }
-
- @Override public @NotNull Collection<URI> getUploadURIs() {
- return Collections.EMPTY_LIST;
- }
- };
- return uploadRec;
- }
-
- @Override public @NotNull DataRecord completeDataRecordUpload(@NotNull String uploadToken)
- throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
- return store.get(uploadToken);
- }
-
- class TestRecord implements DataRecord {
- String id;
- byte[] data;
- long lastModified;
-
- public TestRecord(String id, byte[] data, long lastModified) {
- this.id = id;
- this.data = data;
- this.lastModified = lastModified;
- }
-
- @Override public DataIdentifier getIdentifier() {
- return new DataIdentifier(id);
- }
-
- @Override public String getReference() {
- return id;
- }
-
- @Override public long getLength() throws DataStoreException {
- return data.length;
- }
-
- @Override public InputStream getStream() throws DataStoreException {
- return new ByteArrayInputStream(data);
- }
-
- @Override public long getLastModified() {
- return lastModified;
- }
- }
-
- private String getIdForInputStream(final InputStream in)
- throws Exception {
- MessageDigest digest = MessageDigest.getInstance("SHA-256");
- OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
- try {
- IOUtils.copyLarge(in, output);
- } finally {
- IOUtils.closeQuietly(output);
- IOUtils.closeQuietly(in);
- }
- return encodeHexString(digest.digest());
- }
-
- /*************************************** No Op ***********************/
- @Override public void init(String homeDir) throws RepositoryException {
- }
-
- @Override public void updateModifiedDateOnAccess(long before) {
- }
-
- @Override public int deleteAllOlderThan(long min) throws DataStoreException {
- return 0;
- }
-
- @Override public int getMinRecordLength() {
- return MIN_RECORD_LENGTH;
- }
-
- @Override public void close() throws DataStoreException {
- }
-
- @Override public void clearInUse() {
- }
- }
}
Added: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java?rev=1877058&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java Mon Apr 27 10:00:47 2020
@@ -0,0 +1,92 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * MemoryNodeStore extension which created blobs in the in-memory blob store
+ */
+public class MemoryBlobStoreNodeStore extends MemoryNodeStore {
+ private final BlobStore blobStore;
+ private final boolean fakePath;
+ Set<String> referencedBlobs;
+
+ public MemoryBlobStoreNodeStore(BlobStore blobStore) {
+ this(blobStore, false);
+ }
+
+ public MemoryBlobStoreNodeStore(BlobStore blobStore, boolean fakePath) {
+ this.blobStore = blobStore;
+ this.fakePath = fakePath;
+ }
+
+ public void setReferencedBlobs(Set<String> referencedBlobs) {
+ this.referencedBlobs = referencedBlobs;
+ }
+
+ public Set<String> getReferencedBlobs() {
+ return this.referencedBlobs;
+ }
+
+ @Override
+ public ArrayBasedBlob createBlob(InputStream in) {
+ try {
+ String id = blobStore.writeBlob(in);
+ return new TestBlob(id, blobStore);
+ } catch(Exception e) {
+ BlobGCTest.log.error("Error in createBlobs", e);
+ }
+ return null;
+ }
+
+ public BlobReferenceRetriever getBlobReferenceRetriever() {
+ return collector -> {
+ for (String id : referencedBlobs) {
+ collector.addReference(id, (fakePath ? UUID.randomUUID().toString() : null));
+ }
+ };
+ }
+
+ static class TestBlob extends ArrayBasedBlob {
+ private String id;
+ private BlobStore blobStore;
+
+ public TestBlob(String id, BlobStore blobStore) {
+ super(new byte[0]);
+ this.id = id;
+ this.blobStore = blobStore;
+ }
+
+ @Override
+ public String getContentIdentity() {
+ return id;
+ }
+ @NotNull
+ @Override
+ public InputStream getNewStream() {
+ try {
+ return blobStore.getInputStream(id);
+ } catch (IOException e) {
+ BlobGCTest.log.error("Error in getNewStream", e);
+ }
+ return null;
+ }
+
+ @Override
+ public long length() {
+ try {
+ return blobStore.getBlobLength(id);
+ } catch (IOException e) {
+ BlobGCTest.log.error("Error in length", e);
+ }
+ return 0;
+ }
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/MemoryBlobStoreNodeStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreMarkSweepGarbageCollectorTest.java Mon Apr 27 10:00:47 2020
@@ -91,6 +91,7 @@ public class SharedDataStoreMarkSweepGar
1,
0L,
false,
+ false,
"repo",
whiteboard,
new DefaultStatisticsProvider(executor)
Modified: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java (original)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/SharedDataStoreUtilsTest.java Mon Apr 27 10:00:47 2020
@@ -23,12 +23,15 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -45,6 +48,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Rule;
@@ -217,6 +221,96 @@ public class SharedDataStoreUtilsTest {
}
@Test
+ public void testRefsOld() throws Exception {
+ Clock clock = new Clock.Virtual();
+ TimeLapsedDataStore ds = new TimeLapsedDataStore(clock);
+ Data data = new Data();
+
+ String repoId1 = UUID.randomUUID().toString();
+
+ data.repoIds.add(repoId1);
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId1));
+
+ String repoId2 = UUID.randomUUID().toString();
+
+ data.repoIds.add(repoId2);
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId2));
+
+ clock.waitUntil(10);
+ // All the references from registered repositories are available
+ Set<String> repos = SharedDataStoreUtils
+ .refsNotOld(ds.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+ ds.getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()), 5);
+ assertEquals(Collections.EMPTY_SET, repos);
+ }
+
+ @Test
+ public void testRefsNotOldOne() throws Exception {
+ Clock clock = new Clock.Virtual();
+ TimeLapsedDataStore ds = new TimeLapsedDataStore(clock);
+ Data data = new Data();
+
+ String repoId1 = UUID.randomUUID().toString();
+
+ data.repoIds.add(repoId1);
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId1));
+
+ String repoId2 = UUID.randomUUID().toString();
+
+ data.repoIds.add(repoId2);
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId2));
+
+ clock.waitUntil(10);
+ // Only references from first registered repository is available
+ Set<String> repos = SharedDataStoreUtils
+ .refsNotOld(ds.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+ ds.getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()), 3);
+ assertEquals(Stream.of(repoId2).collect(Collectors.toSet()), repos);
+ }
+
+ @Test
+ public void testRefsNotOldAll() throws Exception {
+ Clock clock = new Clock.Virtual();
+ TimeLapsedDataStore ds = new TimeLapsedDataStore(clock);
+ Data data = new Data();
+
+ String repoId1 = UUID.randomUUID().toString();
+
+ data.repoIds.add(repoId1);
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId1));
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId1));
+
+ String repoId2 = UUID.randomUUID().toString();
+
+ data.repoIds.add(repoId2);
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId2));
+ ds.addMetadataRecord(new ByteArrayInputStream(new byte[0]),
+ SharedStoreRecordType.MARKED_START_MARKER.getNameFromId(repoId2));
+
+ clock.waitUntil(5);
+ // none of the references from registered repositories are available
+ Set<String> repos = SharedDataStoreUtils
+ .refsNotOld(ds.getAllMetadataRecords(SharedStoreRecordType.REPOSITORY.getType()),
+ ds.getAllMetadataRecords(SharedStoreRecordType.MARKED_START_MARKER.getType()), 2);
+ assertEquals(Stream.of(repoId1, repoId2).collect(Collectors.toSet()), repos);
+ }
+
+ @Test
public void repoMarkerExistOnClose() throws Exception {
File rootFolder = folder.newFolder();
dataStore = getBlobStore(rootFolder);
Added: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java?rev=1877058&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java (added)
+++ jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java Mon Apr 27 10:00:47 2020
@@ -0,0 +1,303 @@
+package org.apache.jackrabbit.oak.plugins.blob;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jcr.RepositoryException;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStore;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.core.data.MultiDataStoreAware;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordAccessProvider;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
+import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadOptions;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.commons.codec.binary.Hex.encodeHexString;
+import static org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils.randomStream;
+
+/**
+ * Test in memory DS to store the contents with an increasing time
+ */
+public class TimeLapsedDataStore implements DataStore, MultiDataStoreAware, SharedDataStore, DataRecordAccessProvider {
+ public static final int MIN_RECORD_LENGTH = 50;
+
+ private final long startTime;
+ private Clock clock;
+ Map<String, DataRecord> store;
+ Map<String, DataRecord> metadata;
+ Map<String, String> uploadTokens;
+
+ public TimeLapsedDataStore(Clock clock) {
+ this.startTime = clock.getTime();
+ this.clock = clock;
+ store = Maps.newHashMap();
+ metadata = Maps.newHashMap();
+ uploadTokens = Maps.newHashMap();
+ }
+
+ protected Clock getClock() {
+ return clock;
+ }
+
+ @Override public DataRecord getRecordIfStored(DataIdentifier identifier) throws DataStoreException {
+ if (store.containsKey(identifier.toString())) {
+ return getRecord(identifier);
+ }
+ return null;
+ }
+
+ @Override public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
+ return store.get(identifier.toString());
+ }
+
+ @Override public DataRecord getRecordFromReference(String reference) throws DataStoreException {
+ return getRecord(new DataIdentifier(reference));
+ }
+
+ @Override public DataRecord addRecord(InputStream stream) throws DataStoreException {
+ try {
+ byte[] data = IOUtils.toByteArray(stream);
+ String id = getIdForInputStream(new ByteArrayInputStream(data));
+ TestRecord rec = new TestRecord(id, data, clock.getTime());
+ store.put(id, rec);
+ BlobGCTest.log.info("Blob created {} with timestamp {}", rec.id, rec.lastModified);
+ return rec;
+ } catch (Exception e) {
+ throw new DataStoreException(e);
+ }
+
+ }
+
+ @Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException {
+ return Iterators.transform(store.keySet().iterator(), input -> new DataIdentifier(input));
+ }
+
+ @Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException {
+ store.remove(identifier.toString());
+ }
+
+ /***************************************** SharedDataStore ***************************************/
+
+ @Override public void addMetadataRecord(InputStream stream, String name) throws DataStoreException {
+ try {
+ byte[] data = IOUtils.toByteArray(stream);
+ TestRecord rec = new TestRecord(name, data, clock.getTime());
+ metadata.put(name, rec);
+ BlobGCTest.log.info("Metadata created {} with timestamp {}", rec.id, rec.lastModified);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override public void addMetadataRecord(File f, String name) throws DataStoreException {
+ FileInputStream fstream = null;
+ try {
+ fstream = new FileInputStream(f);
+ addMetadataRecord(fstream, name);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ IOUtils.closeQuietly(fstream);
+ }
+ }
+
+ @Override public DataRecord getMetadataRecord(String name) {
+ return metadata.get(name);
+ }
+
+ @Override public boolean metadataRecordExists(String name) {
+ return metadata.containsKey(name);
+ }
+
+ @Override public List<DataRecord> getAllMetadataRecords(String prefix) {
+ List<DataRecord> recs = Lists.newArrayList();
+ Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, DataRecord> entry = iter.next();
+ if (entry.getKey().startsWith(prefix)) {
+ recs.add(entry.getValue());
+ }
+ }
+ return recs;
+ }
+
+ @Override public boolean deleteMetadataRecord(String name) {
+ metadata.remove(name);
+ if (!metadata.containsKey(name)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override public void deleteAllMetadataRecords(String prefix) {
+ List<String> recs = Lists.newArrayList();
+ Iterator<Map.Entry<String, DataRecord>> iter = metadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, DataRecord> entry = iter.next();
+ if (entry.getKey().startsWith(prefix)) {
+ recs.add(entry.getKey());
+ }
+ }
+
+ for(String key: recs) {
+ metadata.remove(key);
+ }
+ }
+
+ @Override public Iterator<DataRecord> getAllRecords() throws DataStoreException {
+ return store.values().iterator();
+ }
+
+ @Override public DataRecord getRecordForId(DataIdentifier id) throws DataStoreException {
+ return store.get(id.toString());
+ }
+
+ @Override public Type getType() {
+ return Type.SHARED;
+ }
+
+ /**************************** DataRecordAccessProvider *************************/
+
+ @Override public @Nullable URI getDownloadURI(@NotNull DataIdentifier identifier,
+ @NotNull DataRecordDownloadOptions downloadOptions) {
+ return null;
+ }
+
+ @Override
+ public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs)
+ throws IllegalArgumentException, DataRecordUploadException {
+ return initiateDataRecordUpload(maxUploadSizeInBytes, maxNumberOfURIs, DataRecordUploadOptions.DEFAULT);
+ }
+
+ @Override
+ public @Nullable DataRecordUpload initiateDataRecordUpload(long maxUploadSizeInBytes, int maxNumberOfURIs, @NotNull final DataRecordUploadOptions options)
+ throws IllegalArgumentException, DataRecordUploadException {
+ String upToken = UUID.randomUUID().toString();
+ Random rand = new Random();
+ InputStream stream = randomStream(rand.nextInt(1000), 100);
+ byte[] data = new byte[0];
+ try {
+ data = IOUtils.toByteArray(stream);
+ } catch (IOException e) {
+ throw new DataRecordUploadException(e);
+ }
+ TestRecord rec = new TestRecord(upToken, data, clock.getTime());
+ store.put(upToken, rec);
+
+ DataRecordUpload uploadRec = new DataRecordUpload() {
+ @Override public @NotNull String getUploadToken() {
+ return upToken;
+ }
+
+ @Override public long getMinPartSize() {
+ return maxUploadSizeInBytes;
+ }
+
+ @Override public long getMaxPartSize() {
+ return maxUploadSizeInBytes;
+ }
+
+ @Override public @NotNull Collection<URI> getUploadURIs() {
+ return Collections.EMPTY_LIST;
+ }
+ };
+ return uploadRec;
+ }
+
+ @Override public @NotNull DataRecord completeDataRecordUpload(@NotNull String uploadToken)
+ throws IllegalArgumentException, DataRecordUploadException, DataStoreException {
+ return store.get(uploadToken);
+ }
+
+ class TestRecord implements DataRecord {
+ String id;
+ byte[] data;
+ long lastModified;
+
+ public TestRecord(String id, byte[] data, long lastModified) {
+ this.id = id;
+ this.data = data;
+ this.lastModified = lastModified;
+ }
+
+ @Override public DataIdentifier getIdentifier() {
+ return new DataIdentifier(id);
+ }
+
+ @Override public String getReference() {
+ return id;
+ }
+
+ @Override public long getLength() throws DataStoreException {
+ return data.length;
+ }
+
+ @Override public InputStream getStream() throws DataStoreException {
+ return new ByteArrayInputStream(data);
+ }
+
+ @Override public long getLastModified() {
+ return lastModified;
+ }
+ }
+
+ private String getIdForInputStream(final InputStream in)
+ throws Exception {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ OutputStream output = new DigestOutputStream(new NullOutputStream(), digest);
+ try {
+ IOUtils.copyLarge(in, output);
+ } finally {
+ IOUtils.closeQuietly(output);
+ IOUtils.closeQuietly(in);
+ }
+ return encodeHexString(digest.digest());
+ }
+
+ /*************************************** No Op ***********************/
+ @Override public void init(String homeDir) throws RepositoryException {
+ }
+
+ @Override public void updateModifiedDateOnAccess(long before) {
+ }
+
+ @Override public int deleteAllOlderThan(long min) throws DataStoreException {
+ return 0;
+ }
+
+ @Override public int getMinRecordLength() {
+ return MIN_RECORD_LENGTH;
+ }
+
+ @Override public void close() throws DataStoreException {
+ }
+
+ @Override public void clearInUse() {
+ }
+}
Propchange: jackrabbit/oak/trunk/oak-blob-plugins/src/test/java/org/apache/jackrabbit/oak/plugins/blob/TimeLapsedDataStore.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-run/README.md
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/README.md?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/README.md (original)
+++ jackrabbit/oak/trunk/oak-run/README.md Mon Apr 27 10:00:47 2020
@@ -595,7 +595,10 @@ The following options are available:
--export-metrics - Option to export the captured metrics. The format of the command is type;URL;key1=value1,key2=value2
Currently only [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) is supported
e.g. --export-metrics "pushgateway;localhost:9091;key1=value1,key2=value2"
-
+ --sweep-only-refs-past-retention - Sweep only if the earliest references from all repositories are past the retention period which is govered by the max-age parameter.
+ Boolean (Optional). Defaults to False. Only applicable for --collect-garbage
+ --check-consistency-gc - Performs a consistency check immediately after the GC.
+ Boolean (Optional). Defaults to False. Only applicable for --collect-garbage
Note:
Note: When using --export-metrics the following additional jars have to be downloaded to support Prometheus Pushgatway
Modified: jackrabbit/oak/trunk/oak-run/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/pom.xml?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-run/pom.xml Mon Apr 27 10:00:47 2020
@@ -524,6 +524,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-blob-plugins</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
</profiles>
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java Mon Apr 27 10:00:47 2020
@@ -352,7 +352,8 @@ public class DataStoreCommand implements
new MarkSweepGarbageCollector(retriever, (GarbageCollectableBlobStore) fixture.getBlobStore(), service,
dataStoreOpts.getOutDir().getAbsolutePath(), dataStoreOpts.getBatchCount(),
SECONDS.toMillis(dataStoreOpts.getBlobGcMaxAgeInSecs()), dataStoreOpts.checkConsistencyAfterGC(),
- repositoryId, fixture.getWhiteboard(), getService(fixture.getWhiteboard(), StatisticsProvider.class));
+ dataStoreOpts.sweepIfRefsPastRetention(), repositoryId, fixture.getWhiteboard(),
+ getService(fixture.getWhiteboard(), StatisticsProvider.class));
collector.setTraceOutput(true);
return collector;
Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreOptions.java Mon Apr 27 10:00:47 2020
@@ -56,6 +56,7 @@ public class DataStoreOptions implements
private final OptionSpec<Boolean> resetLoggingConfig;
private OptionSpec<String> exportMetrics;
private static final String DELIM = ",";
+ private OptionSpec<Boolean> sweepIfRefsPastRetention;
public DataStoreOptions(OptionParser parser) {
collectGarbage = parser.accepts("collect-garbage",
@@ -67,6 +68,10 @@ public class DataStoreOptions implements
"Performs a consistency check immediately after DSGC")
.withOptionalArg().ofType(Boolean.class).defaultsTo(Boolean.FALSE);
+ sweepIfRefsPastRetention = parser.accepts("sweep-only-refs-past-retention",
+ "Only allows sweep if all references available older than retention time (Default false)")
+ .withOptionalArg().ofType(Boolean.class).defaultsTo(Boolean.FALSE);
+
consistencyCheck =
parser.accepts("check-consistency", "Performs a consistency check on the repository/datastore defined");
@@ -233,4 +238,7 @@ public class DataStoreOptions implements
return options.valuesOf(verbosePathInclusionRegex);
}
+ public boolean sweepIfRefsPastRetention() {
+ return options.has(sweepIfRefsPastRetention) && sweepIfRefsPastRetention.value(options) ;
+ }
}
Modified: jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandMetadataTest.java Mon Apr 27 10:00:47 2020
@@ -32,7 +32,7 @@ import com.google.common.collect.Sets;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.commons.FileIOUtils;
-import org.apache.jackrabbit.oak.plugins.blob.BlobGCTest.MemoryBlobStoreNodeStore;
+import org.apache.jackrabbit.oak.plugins.blob.MemoryBlobStoreNodeStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.run.DataStoreCommandTest.DataStoreFixture;
import org.apache.jackrabbit.oak.run.DataStoreCommandTest.StoreFixture;
Modified: jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java?rev=1877058&r1=1877057&r2=1877058&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java Mon Apr 27 10:00:47 2020
@@ -58,7 +58,7 @@ import org.apache.jackrabbit.oak.blob.cl
import org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils;
import org.apache.jackrabbit.oak.commons.FileIOUtils;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.plugins.blob.BlobGCTest.MemoryBlobStoreNodeStore;
+import org.apache.jackrabbit.oak.plugins.blob.MemoryBlobStoreNodeStore;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.OakFileDataStore;
@@ -527,7 +527,7 @@ public class DataStoreCommandTest {
Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
storeFixture.close();
additionalParams += " --check-consistency-gc true";
- testGc(dump, data, 0, false);
+ testGc(dump, data, 0, false, false);
assertFileEquals(dump, "avail-", Sets.difference(data.added, data.missingDataStore));
@@ -536,12 +536,30 @@ public class DataStoreCommandTest {
}
@Test
+ public void gcSweepRefsOld() throws Exception {
+ File dump = temporaryFolder.newFolder();
+ Data data = prepareData(storeFixture, blobFixture, 10, 5, 0);
+ storeFixture.close();
+ additionalParams += " --check-consistency-gc true --sweep-only-refs-past-retention true";
+ testGc(dump, data, 0, false, false);
+ }
+
+ @Test
+ public void gcSweepRefsNotOld() throws Exception {
+ File dump = temporaryFolder.newFolder();
+ Data data = prepareData(storeFixture, blobFixture, 10, 5, 0);
+ storeFixture.close();
+ additionalParams += " --check-consistency-gc true --sweep-only-refs-past-retention true";
+ testGc(dump, data, 1000, false, true);
+ }
+
+ @Test
public void gc() throws Exception {
File dump = temporaryFolder.newFolder();
Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
storeFixture.close();
- testGc(dump, data, 0, false);
+ testGc(dump, data, 0, false, false);
}
/*
Command should throw and exception if --verboseRootPath specified
@@ -555,7 +573,7 @@ public class DataStoreCommandTest {
additionalParams += " --verboseRootPath /a";
try {
- testGc(dump, data, 0, false);
+ testGc(dump, data, 0, false, false);
} catch (OptionException e) {
assertTrue(e.getMessage().equals("Option(s) [verboseRootPath] are unavailable " +
"given other options on the command line"));
@@ -570,7 +588,7 @@ public class DataStoreCommandTest {
Data data = prepareData(storeFixture, blobFixture, 10, 0, 1);
storeFixture.close();
- testGc(dump, data, 0, false);
+ testGc(dump, data, 0, false, false);
}
@Test
@@ -579,7 +597,7 @@ public class DataStoreCommandTest {
Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
storeFixture.close();
- testGc(dump, data, 10000, false);
+ testGc(dump, data, 10000, false, false);
}
@Test
@@ -588,7 +606,7 @@ public class DataStoreCommandTest {
Data data = prepareData(storeFixture, blobFixture, 10, 5, 1);
storeFixture.close();
- testGc(dump, data, 10000, true);
+ testGc(dump, data, 10000, true, false);
}
@Test
@@ -607,7 +625,7 @@ public class DataStoreCommandTest {
storeFixture.close();
- testGc(dump, data, 0, false);
+ testGc(dump, data, 0, false, false);
}
@Test
@@ -744,7 +762,7 @@ public class DataStoreCommandTest {
}
- private void testGc(File dump, Data data, long maxAge, boolean markOnly) throws Exception {
+ private void testGc(File dump, Data data, long maxAge, boolean markOnly, boolean refsOld) throws Exception {
List<String> argsList = Lists
.newArrayList("--collect-garbage", String.valueOf(markOnly), "--max-age", String.valueOf(maxAge),
"--" + getOption(blobFixture.getType()), blobFixture.getConfigPath(),
@@ -757,14 +775,14 @@ public class DataStoreCommandTest {
DataStoreCommand cmd = new DataStoreCommand();
cmd.execute(argsList.toArray(new String[0]));
- if (!markOnly) {
+ if (!markOnly && !refsOld) {
assertFileEquals(dump, "avail-", Sets.difference(data.added, data.missingDataStore));
} else {
assertFileNull(dump, "avail-");
}
assertFileEquals(dump, "marked-", Sets.difference(data.added, data.deleted));
- if (!markOnly) {
+ if (!markOnly && !refsOld) {
assertFileEquals(dump, "gccand-", data.deleted);
} else {
assertFileNull(dump, "gccand-");