You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/09/25 16:19:17 UTC
svn commit: r1705312 - in /jackrabbit/oak/branches/1.2: ./
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/
Author: amitj
Date: Fri Sep 25 14:19:16 2015
New Revision: 1705312
URL: http://svn.apache.org/viewvc?rev=1705312&view=rev
Log:
OAK-3443: Track the start time of mark in GC
Merged revision(s) 1705268, 1705273 from trunk
Modified:
jackrabbit/oak/branches/1.2/ (props changed)
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Propchange: jackrabbit/oak/branches/1.2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 25 14:19:16 2015
@@ -1,3 +1,3 @@
/jackrabbit/oak/branches/1.0:1665962
-/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820
,1684868,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693209,1693421,1693525-1693526,1694007,1694393-1694394,1695050,1695122,1695280,1695299,1695457,1695482,1695507,1695521,1695540,1696194,1696242,1696285,1696578,1696759,1696916,1697363,1697373,1697410,1697
582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701948,1701955,1701959,1701965,1701986,1702022,1702272,1702387,1702405,1702423,1702860,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705027,1705043,1705055
+/jackrabbit/oak/trunk:1672350,1672468,1672537,1672603,1672642,1672644,1672834-1672835,1673351,1673410,1673414-1673415,1673436,1673644,1673662-1673664,1673669,1673695,1673713,1673738,1673787,1673791,1674046,1674065,1674075,1674107,1674228,1674780,1674880,1675054-1675055,1675319,1675332,1675354,1675357,1675382,1675555,1675566,1675593,1676198,1676237,1676407,1676458,1676539,1676670,1676693,1676703,1676725,1677579,1677581,1677609,1677611,1677774,1677788,1677797,1677804,1677806,1677939,1677991,1678023,1678095-1678096,1678124,1678171,1678173,1678211,1678323,1678758,1678938,1678954,1679144,1679165,1679191,1679232,1679235,1679503,1679958,1679961,1680170,1680172,1680182,1680222,1680232,1680236,1680461,1680633,1680643,1680747,1680805-1680806,1680903,1681282,1681767,1681918,1681955,1682042,1682218,1682235,1682437,1682494,1682555,1682855,1682904,1683059,1683089,1683213,1683249,1683259,1683278,1683323,1683687,1683700,1684174-1684175,1684186,1684376,1684442,1684561,1684570,1684601,1684618,1684820
,1684868,1685023,1685075,1685370,1685552,1685589-1685590,1685840,1685964,1685977,1685989,1685999,1686023,1686032,1686097,1686162,1686229,1686234,1686253,1686414,1686780,1686854,1686857,1686971,1687053-1687055,1687175,1687196,1687198,1687220,1687239-1687240,1687301,1687441,1687553,1688089-1688090,1688172,1688179,1688349,1688421,1688436,1688453,1688616,1688622,1688634,1688636,1688817,1689003-1689004,1689008,1689577,1689581,1689623,1689810,1689828,1689831,1689833,1689903,1690017,1690043,1690047,1690057,1690247,1690249,1690634-1690637,1690650,1690669,1690674,1690885,1690941,1691139,1691151,1691159,1691167,1691183,1691188,1691210,1691280,1691307,1691331-1691333,1691345,1691384-1691385,1691401,1691509,1692133-1692134,1692156,1692250,1692274,1692363,1692382,1692478,1692955,1693002,1693030,1693209,1693421,1693525-1693526,1694007,1694393-1694394,1695050,1695122,1695280,1695299,1695457,1695482,1695507,1695521,1695540,1696194,1696242,1696285,1696578,1696759,1696916,1697363,1697373,1697410,1697
582,1697589,1697616,1697672,1700191,1700231,1700397,1700403,1700506,1700571,1700727,1700749,1700769,1700775,1701065,1701619,1701733,1701743,1701750,1701768,1701806,1701810,1701814,1701948,1701955,1701959,1701965,1701986,1702022,1702272,1702387,1702405,1702423,1702860,1702942,1702960,1703212,1703382,1703395,1703411,1703428,1703430,1703568,1703592,1703758,1703858,1703878,1704256,1704282,1704285,1704457,1704479,1704490,1704614,1704629,1704636,1704655,1704670,1704886,1705027,1705043,1705055,1705268,1705273
/jackrabbit/trunk:1345480
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java Fri Sep 25 14:19:16 2015
@@ -36,7 +36,7 @@ import org.apache.jackrabbit.oak.commons
* Also, manages any temporary files needed as well as external sorting.
*
*/
-class GarbageCollectorFileState implements Closeable{
+public class GarbageCollectorFileState implements Closeable{
/** The root of the gc file state directory. */
private final File home;
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Fri Sep 25 14:19:16 2015
@@ -49,7 +49,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.PeekingIterator;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.jackrabbit.core.data.DataRecord;
@@ -229,16 +228,17 @@ public class MarkSweepGarbageCollector i
* @param markOnly whether to mark only
* @throws Exception the exception
*/
- private void markAndSweep(boolean markOnly) throws Exception {
+ protected void markAndSweep(boolean markOnly) throws Exception {
boolean threw = true;
GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
try {
Stopwatch sw = Stopwatch.createStarted();
LOG.info("Starting Blob garbage collection");
-
+
+ long markStart = System.currentTimeMillis();
mark(fs);
if (!markOnly) {
- long deleteCount = sweep(fs);
+ long deleteCount = sweep(fs, markStart);
threw = false;
LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
@@ -255,7 +255,7 @@ public class MarkSweepGarbageCollector i
* Mark phase of the GC.
* @param fs the garbage collector file state
*/
- private void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
+ protected void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
LOG.debug("Starting mark phase of the garbage collector");
// Create a time marker in the data store if applicable
@@ -341,8 +341,9 @@ public class MarkSweepGarbageCollector i
* @return the number of blobs deleted
* @throws Exception the exception
* @param fs the garbage collector file state
+ * @param markStart the start time of mark to take as reference for deletion
*/
- private long sweep(GarbageCollectorFileState fs) throws Exception {
+ protected long sweep(GarbageCollectorFileState fs, long markStart) throws Exception {
long earliestRefAvailTime;
// Merge all the blob references available from all the reference files in the data store meta store
// Only go ahead if merge succeeded
@@ -350,6 +351,7 @@ public class MarkSweepGarbageCollector i
earliestRefAvailTime =
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime);
+ earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart);
} catch (Exception e) {
return 0;
}
@@ -361,10 +363,11 @@ public class MarkSweepGarbageCollector i
difference(fs);
long count = 0;
long deleted = 0;
-
+
+ long lastMaxModifiedTime = getLastMaxModifiedTime(earliestRefAvailTime);
LOG.debug("Starting sweep phase of the garbage collector");
LOG.debug("Sweeping blobs with modified time > than the configured max deleted time ({}). " +
- timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+ timestampToString(lastMaxModifiedTime));
ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
@@ -377,13 +380,13 @@ public class MarkSweepGarbageCollector i
if (ids.size() >= getBatchCount()) {
count += ids.size();
- deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+ deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
ids = newArrayList();
}
}
if (!ids.isEmpty()) {
count += ids.size();
- deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+ deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
}
BufferedWriter writer = null;
@@ -406,7 +409,7 @@ public class MarkSweepGarbageCollector i
LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates identified. This may happen if blob "
+ "modified time is > "
+ "than the max deleted time ({})", deleted, count,
- timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+ timestampToString(lastMaxModifiedTime));
}
// Remove all the merged marked references
@@ -446,7 +449,7 @@ public class MarkSweepGarbageCollector i
long deleted = 0;
try {
LOG.trace("Blob ids to be deleted {}", ids);
- deleted = blobStore.countDeleteChunks(ids, getLastMaxModifiedTime(maxModified));
+ deleted = blobStore.countDeleteChunks(ids, maxModified);
if (deleted != ids.size()) {
// Only log and do not add to exception queue since some blobs may not match the
// lastMaxModifiedTime criteria.
@@ -463,7 +466,7 @@ public class MarkSweepGarbageCollector i
* Iterates the complete node tree and collect all blob references
* @param fs the garbage collector file state
*/
- private void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
+ protected void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
final AtomicInteger count = new AtomicInteger();
try {
Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Fri Sep 25 14:19:16 2015
@@ -21,22 +21,29 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import junit.framework.Assert;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
@@ -48,12 +55,17 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
/**
* Tests for MongoMK GC
*/
public class MongoBlobGCTest extends AbstractMongoConnectionTest {
private Clock clock;
+ private static final Logger log = LoggerFactory.getLogger(MongoBlobGCTest.class);
public DataStoreState setUp(boolean deleteDirect) throws Exception {
DocumentNodeStore s = mk.getNodeStore();
@@ -115,7 +127,7 @@ public class MongoBlobGCTest extends Abs
Set<String> blobsPresent = Sets.newHashSet();
}
- public HashSet<String> addInlined() throws Exception {
+ private HashSet<String> addInlined() throws Exception {
HashSet<String> set = new HashSet<String>();
DocumentNodeStore s = mk.getNodeStore();
NodeBuilder a = s.getRoot().builder();
@@ -127,6 +139,7 @@ public class MongoBlobGCTest extends Abs
s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return set;
}
+
private void deleteFromMongo(String nodeId) {
DBCollection coll = mongoConnection.getDB().getCollection("nodes");
BasicDBObject blobNodeObj = new BasicDBObject();
@@ -169,10 +182,37 @@ public class MongoBlobGCTest extends Abs
Set<String> existingAfterGC = gc(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
-
+
+ @Test
+ public void gcLongRunningBlobCollection() throws Exception {
+ DataStoreState state = setUp(true);
+ log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+ log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+ DocumentNodeStore store = mk.getNodeStore();
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+ repoId = ClusterRepositoryInfo.createId(store);
+ ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
+ TestGarbageCollector gc = new TestGarbageCollector(
+ new DocumentBlobReferenceRetriever(store),
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+ gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate();
+ log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+
+ assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+ assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+ }
+
private Set<String> gc(int blobGcMaxAgeInSecs) throws Exception {
DocumentNodeStore store = mk.getNodeStore();
String repoId = null;
+
if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
repoId = ClusterRepositoryInfo.createId(store);
((SharedDataStore) store.getBlobStore()).addMetadataRecord(
@@ -214,4 +254,82 @@ public class MongoBlobGCTest extends Abs
clock.waitUntil(Revision.getCurrentTimestamp());
return clock;
}
+
+ /**
+ * Waits for some time and adds additional blobs after blob referenced identified to simulate
+ * long running blob id collection phase.
+ */
+ class TestGarbageCollector extends MarkSweepGarbageCollector {
+ long maxLastModifiedInterval;
+ String root;
+ GarbageCollectableBlobStore blobStore;
+ Set<String> additionalBlobs;
+
+ public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+ Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+ @Nullable String repositoryId) throws IOException {
+ super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+ this.root = root;
+ this.blobStore = blobStore;
+ this.maxLastModifiedInterval = maxLastModifiedInterval;
+ this.additionalBlobs = Sets.newHashSet();
+ }
+
+ @Override
+ protected void markAndSweep(boolean markOnly) throws Exception {
+ boolean threw = true;
+ GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+ try {
+ Stopwatch sw = Stopwatch.createStarted();
+ LOG.info("Starting Test Blob garbage collection");
+
+ // Sleep a little more than the max interval to get over the interval for valid blobs
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+
+ long markStart = System.currentTimeMillis();
+ mark(fs);
+ LOG.info("Mark finished");
+
+ additionalBlobs = createAdditional();
+
+ if (!markOnly) {
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+
+ long deleteCount = sweep(fs, markStart);
+ threw = false;
+
+ LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+ deleteCount, maxLastModifiedInterval);
+ }
+ } finally {
+ if (!LOG.isTraceEnabled()) {
+ Closeables.close(fs, threw);
+ }
+ }
+ }
+
+ public HashSet<String> createAdditional() throws Exception {
+ HashSet<String> blobSet = new HashSet<String>();
+ DocumentNodeStore s = mk.getNodeStore();
+ NodeBuilder a = s.getRoot().builder();
+ int number = 5;
+ for (int i = 0; i < number; i++) {
+ Blob b = s.createBlob(randomStream(100 + i, 16516));
+ a.child("cafter" + i).setProperty("x", b);
+ Iterator<String> idIter =
+ ((GarbageCollectableBlobStore) s.getBlobStore())
+ .resolveChunks(b.toString());
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ blobSet.add(chunk);
+ }
+ }
+ log.info("{} Additional created {}", blobSet.size(), blobSet);
+
+ s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return blobSet;
+ }
+ }
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1705312&r1=1705311&r2=1705312&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Fri Sep 25 14:19:16 2015
@@ -22,6 +22,7 @@ import static org.apache.commons.io.File
import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -34,14 +35,22 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
import org.apache.commons.io.FileUtils;
import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
@@ -60,8 +69,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
/**
* Tests for SegmentNodeStore DataStore GC
*/
@@ -98,7 +105,7 @@ public class SegmentDataStoreBlobGCTest
blobStore = DataStoreUtils.getBlobStore();
nodeStore = getNodeStore(blobStore);
startDate = new Date();
-
+
NodeBuilder a = nodeStore.getRoot().builder();
/* Create garbage by creating in-lined blobs (size < 16KB) */
@@ -136,7 +143,7 @@ public class SegmentDataStoreBlobGCTest
processed.add(n);
}
}
-
+
DataStoreState state = new DataStoreState();
for (int i = 0; i < numBlobs; i++) {
SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(i, 16516));
@@ -150,7 +157,7 @@ public class SegmentDataStoreBlobGCTest
}
a.child("c" + i).setProperty("x", b);
}
-
+
nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
log.info("Created blobs : {}", state.blobsAdded.size());
@@ -166,19 +173,19 @@ public class SegmentDataStoreBlobGCTest
return state;
}
-
+
private class DataStoreState {
Set<String> blobsAdded = Sets.newHashSet();
Set<String> blobsPresent = Sets.newHashSet();
}
-
+
private void delete(String nodeId) throws CommitFailedException {
NodeBuilder builder = nodeStore.getRoot().builder();
builder.child(nodeId).remove();
nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
-
+
@Test
public void gc() throws Exception {
DataStoreState state = setUp();
@@ -187,7 +194,7 @@ public class SegmentDataStoreBlobGCTest
Set<String> existingAfterGC = gcInternal(0);
assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty());
}
-
+
@Test
public void noGc() throws Exception {
DataStoreState state = setUp();
@@ -196,7 +203,7 @@ public class SegmentDataStoreBlobGCTest
Set<String> existingAfterGC = gcInternal(86400);
assertTrue(Sets.symmetricDifference(state.blobsAdded, existingAfterGC).isEmpty());
}
-
+
private Set<String> gcInternal(long maxBlobGcInSecs) throws Exception {
String repoId = null;
if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
@@ -205,11 +212,11 @@ public class SegmentDataStoreBlobGCTest
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
-
+
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store.getTracker()),
- (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 2048, maxBlobGcInSecs,
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 2048, maxBlobGcInSecs,
repoId);
gc.collectGarbage(false);
@@ -219,6 +226,31 @@ public class SegmentDataStoreBlobGCTest
return existingAfterGC;
}
+ @Test
+ public void gcLongRunningBlobCollection() throws Exception {
+ DataStoreState state = setUp();
+ log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+ log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+ repoId = ClusterRepositoryInfo.createId(nodeStore);
+ ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
+ TestGarbageCollector gc = new TestGarbageCollector(
+ new SegmentBlobReferenceRetriever(store.getTracker()),
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+ gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate();
+ log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+
+ assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+ assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+ }
+
protected Set<String> iterate() throws Exception {
Iterator<String> cur = blobStore.getAllChunkIds(0);
@@ -245,5 +277,81 @@ public class SegmentDataStoreBlobGCTest
r.nextBytes(data);
return new ByteArrayInputStream(data);
}
+
+ /**
+ * Waits for some time and adds additional blobs after blob referenced identified to simulate
+ * long running blob id collection phase.
+ */
+ class TestGarbageCollector extends MarkSweepGarbageCollector {
+ long maxLastModifiedInterval;
+ String root;
+ GarbageCollectableBlobStore blobStore;
+ Set<String> additionalBlobs;
+
+ public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+ Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+ @Nullable String repositoryId) throws IOException {
+ super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+ this.root = root;
+ this.blobStore = blobStore;
+ this.maxLastModifiedInterval = maxLastModifiedInterval;
+ this.additionalBlobs = Sets.newHashSet();
+ }
+
+ @Override
+ protected void markAndSweep(boolean markOnly) throws Exception {
+ boolean threw = true;
+ GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+ try {
+ Stopwatch sw = Stopwatch.createStarted();
+ LOG.info("Starting Test Blob garbage collection");
+
+ // Sleep a little more than the max interval to get over the interval for valid blobs
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+
+ long markStart = System.currentTimeMillis();
+ mark(fs);
+ LOG.info("Mark finished");
+
+ additionalBlobs = createAdditional();
+
+ if (!markOnly) {
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+
+ long deleteCount = sweep(fs, markStart);
+ threw = false;
+
+ LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+ deleteCount, maxLastModifiedInterval);
+ }
+ } finally {
+ if (!LOG.isTraceEnabled()) {
+ Closeables.close(fs, threw);
+ }
+ }
+ }
+
+ public HashSet<String> createAdditional() throws Exception {
+ HashSet<String> blobSet = new HashSet<String>();
+ NodeBuilder a = nodeStore.getRoot().builder();
+ int number = 5;
+ for (int i = 0; i < number; i++) {
+ SegmentBlob b = (SegmentBlob) nodeStore.createBlob(randomStream(100 + i, 16516));
+ a.child("cafter" + i).setProperty("x", b);
+ Iterator<String> idIter =
+ ((GarbageCollectableBlobStore) blobStore).resolveChunks(b.getBlobId());
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ blobSet.add(chunk);
+ }
+ }
+ log.info("{} Additional created {}", blobSet.size(), blobSet);
+
+ nodeStore.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return blobSet;
+ }
+ }
}