You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/09/25 12:41:36 UTC
svn commit: r1705268 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/blob/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: amitj
Date: Fri Sep 25 10:41:36 2015
New Revision: 1705268
URL: http://svn.apache.org/viewvc?rev=1705268&view=rev
Log:
OAK-3443: Track the start time of mark in GC
Added a test case which simulates the condition by delaying the blob identification phase and adds additional blobs which should not be collected
Updated some methods to protected
Minor code re-arragement
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1705268&r1=1705267&r2=1705268&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java Fri Sep 25 10:41:36 2015
@@ -36,7 +36,7 @@ import org.apache.jackrabbit.oak.commons
* Also, manages any temporary files needed as well as external sorting.
*
*/
-class GarbageCollectorFileState implements Closeable{
+public class GarbageCollectorFileState implements Closeable{
/** The root of the gc file state directory. */
private final File home;
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1705268&r1=1705267&r2=1705268&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Fri Sep 25 10:41:36 2015
@@ -51,7 +51,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.PeekingIterator;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
-
import com.google.common.util.concurrent.ListenableFutureTask;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
@@ -235,16 +234,17 @@ public class MarkSweepGarbageCollector i
* @param markOnly whether to mark only
* @throws Exception the exception
*/
- private void markAndSweep(boolean markOnly) throws Exception {
+ protected void markAndSweep(boolean markOnly) throws Exception {
boolean threw = true;
GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
try {
Stopwatch sw = Stopwatch.createStarted();
LOG.info("Starting Blob garbage collection");
-
+
+ long markStart = System.currentTimeMillis();
mark(fs);
if (!markOnly) {
- long deleteCount = sweep(fs);
+ long deleteCount = sweep(fs, markStart);
threw = false;
LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
@@ -261,7 +261,7 @@ public class MarkSweepGarbageCollector i
* Mark phase of the GC.
* @param fs the garbage collector file state
*/
- private void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
+ protected void mark(GarbageCollectorFileState fs) throws IOException, DataStoreException {
LOG.debug("Starting mark phase of the garbage collector");
// Create a time marker in the data store if applicable
@@ -351,8 +351,9 @@ public class MarkSweepGarbageCollector i
* @return the number of blobs deleted
* @throws Exception the exception
* @param fs the garbage collector file state
+ * @param markStart the start time of mark to take as reference for deletion
*/
- private long sweep(GarbageCollectorFileState fs) throws Exception {
+ protected long sweep(GarbageCollectorFileState fs, long markStart) throws Exception {
long earliestRefAvailTime;
// Merge all the blob references available from all the reference files in the data store meta store
// Only go ahead if merge succeeded
@@ -360,6 +361,7 @@ public class MarkSweepGarbageCollector i
earliestRefAvailTime =
GarbageCollectionType.get(blobStore).mergeAllMarkedReferences(blobStore, fs);
LOG.debug("Earliest reference available for timestamp [{}]", earliestRefAvailTime);
+ earliestRefAvailTime = (earliestRefAvailTime < markStart ? earliestRefAvailTime : markStart);
} catch (Exception e) {
return 0;
}
@@ -371,10 +373,11 @@ public class MarkSweepGarbageCollector i
difference(fs);
long count = 0;
long deleted = 0;
-
+
+ long lastMaxModifiedTime = getLastMaxModifiedTime(earliestRefAvailTime);
LOG.debug("Starting sweep phase of the garbage collector");
LOG.debug("Sweeping blobs with modified time > than the configured max deleted time ({}). ",
- timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+ timestampToString(lastMaxModifiedTime));
ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
@@ -387,13 +390,13 @@ public class MarkSweepGarbageCollector i
if (ids.size() >= getBatchCount()) {
count += ids.size();
- deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+ deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
ids = newArrayList();
}
}
if (!ids.isEmpty()) {
count += ids.size();
- deleted += sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
+ deleted += sweepInternal(ids, exceptionQueue, lastMaxModifiedTime);
}
BufferedWriter writer = null;
@@ -416,7 +419,7 @@ public class MarkSweepGarbageCollector i
LOG.warn("Deleted only [{}] blobs entries from the [{}] candidates identified. This may happen if blob "
+ "modified time is > "
+ "than the max deleted time ({})", deleted, count,
- timestampToString(getLastMaxModifiedTime(earliestRefAvailTime)));
+ timestampToString(lastMaxModifiedTime));
}
// Remove all the merged marked references
@@ -456,7 +459,7 @@ public class MarkSweepGarbageCollector i
long deleted = 0;
try {
LOG.trace("Blob ids to be deleted {}", ids);
- deleted = blobStore.countDeleteChunks(ids, getLastMaxModifiedTime(maxModified));
+ deleted = blobStore.countDeleteChunks(ids, maxModified);
if (deleted != ids.size()) {
// Only log and do not add to exception queue since some blobs may not match the
// lastMaxModifiedTime criteria.
@@ -473,7 +476,7 @@ public class MarkSweepGarbageCollector i
* Iterates the complete node tree and collect all blob references
* @param fs the garbage collector file state
*/
- private void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
+ protected void iterateNodeTree(GarbageCollectorFileState fs) throws IOException {
final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
final AtomicInteger count = new AtomicInteger();
try {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1705268&r1=1705267&r2=1705268&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Fri Sep 25 10:41:36 2015
@@ -21,23 +21,29 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import junit.framework.Assert;
import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.plugins.blob.BlobReferenceRetriever;
+import org.apache.jackrabbit.oak.plugins.blob.GarbageCollectorFileState;
import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
@@ -51,12 +57,17 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
/**
* Tests for MongoMK GC
*/
public class MongoBlobGCTest extends AbstractMongoConnectionTest {
private Clock clock;
+ private static final Logger log = LoggerFactory.getLogger(MongoBlobGCTest.class);
public DataStoreState setUp(boolean deleteDirect) throws Exception {
DocumentNodeStore s = mk.getNodeStore();
@@ -122,7 +133,7 @@ public class MongoBlobGCTest extends Abs
Set<String> blobsPresent = Sets.newHashSet();
}
- public HashSet<String> addInlined() throws Exception {
+ private HashSet<String> addInlined() throws Exception {
HashSet<String> set = new HashSet<String>();
DocumentNodeStore s = mk.getNodeStore();
NodeBuilder a = s.getRoot().builder();
@@ -134,6 +145,7 @@ public class MongoBlobGCTest extends Abs
s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
return set;
}
+
private void deleteFromMongo(String nodeId) {
DBCollection coll = mongoConnection.getDB().getCollection("nodes");
BasicDBObject blobNodeObj = new BasicDBObject();
@@ -237,7 +249,33 @@ public class MongoBlobGCTest extends Abs
Iterator<ReferencedBlob> blobs = mk.getNodeStore().getReferencedBlobsIterator();
assertTrue(blobs instanceof MongoBlobReferenceIterator);
}
-
+
+ @Test
+ public void gcLongRunningBlobCollection() throws Exception {
+ DataStoreState state = setUp(true);
+ log.info("{} Blobs added {}", state.blobsAdded.size(), state.blobsAdded);
+ log.info("{} Blobs should be present {}", state.blobsPresent.size(), state.blobsPresent);
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
+ DocumentNodeStore store = mk.getNodeStore();
+ String repoId = null;
+ if (SharedDataStoreUtils.isShared(store.getBlobStore())) {
+ repoId = ClusterRepositoryInfo.createId(store);
+ ((SharedDataStore) store.getBlobStore()).addMetadataRecord(
+ new ByteArrayInputStream(new byte[0]),
+ REPOSITORY.getNameFromId(repoId));
+ }
+ TestGarbageCollector gc = new TestGarbageCollector(
+ new DocumentBlobReferenceRetriever(store),
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 5000, repoId);
+ gc.collectGarbage(false);
+ Set<String> existingAfterGC = iterate();
+ log.info("{} Blobs existing after gc {}", existingAfterGC.size(), existingAfterGC);
+
+ assertTrue(Sets.difference(state.blobsPresent, existingAfterGC).isEmpty());
+ assertEquals(gc.additionalBlobs, Sets.symmetricDifference(state.blobsPresent, existingAfterGC));
+ }
+
private Set<String> gc(int blobGcMaxAgeInSecs) throws Exception {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = init(blobGcMaxAgeInSecs, executor);
@@ -287,4 +325,82 @@ public class MongoBlobGCTest extends Abs
clock.waitUntil(Revision.getCurrentTimestamp());
return clock;
}
+
+ /**
+ * Waits for some time and adds additional blobs after blob referenced identified to simulate
+ * long running blob id collection phase.
+ */
+ class TestGarbageCollector extends MarkSweepGarbageCollector {
+ long maxLastModifiedInterval;
+ String root;
+ GarbageCollectableBlobStore blobStore;
+ Set<String> additionalBlobs;
+
+ public TestGarbageCollector(BlobReferenceRetriever marker, GarbageCollectableBlobStore blobStore,
+ Executor executor, String root, int batchCount, long maxLastModifiedInterval,
+ @Nullable String repositoryId) throws IOException {
+ super(marker, blobStore, executor, root, batchCount, maxLastModifiedInterval, repositoryId);
+ this.root = root;
+ this.blobStore = blobStore;
+ this.maxLastModifiedInterval = maxLastModifiedInterval;
+ this.additionalBlobs = Sets.newHashSet();
+ }
+
+ @Override
+ protected void markAndSweep(boolean markOnly) throws Exception {
+ boolean threw = true;
+ GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
+ try {
+ Stopwatch sw = Stopwatch.createStarted();
+ LOG.info("Starting Test Blob garbage collection");
+
+ // Sleep a little more than the max interval to get over the interval for valid blobs
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make blobs old", maxLastModifiedInterval + 100);
+
+ long markStart = System.currentTimeMillis();
+ mark(fs);
+ LOG.info("Mark finished");
+
+ additionalBlobs = createAdditional();
+
+ if (!markOnly) {
+ Thread.sleep(maxLastModifiedInterval + 100);
+ LOG.info("Slept {} to make additional blobs old", maxLastModifiedInterval + 100);
+
+ long deleteCount = sweep(fs, markStart);
+ threw = false;
+
+ LOG.info("Blob garbage collection completed in {}. Number of blobs deleted [{}]", sw.toString(),
+ deleteCount, maxLastModifiedInterval);
+ }
+ } finally {
+ if (!LOG.isTraceEnabled()) {
+ Closeables.close(fs, threw);
+ }
+ }
+ }
+
+ public HashSet<String> createAdditional() throws Exception {
+ HashSet<String> blobSet = new HashSet<String>();
+ DocumentNodeStore s = mk.getNodeStore();
+ NodeBuilder a = s.getRoot().builder();
+ int number = 5;
+ for (int i = 0; i < number; i++) {
+ Blob b = s.createBlob(randomStream(100 + i, 16516));
+ a.child("cafter" + i).setProperty("x", b);
+ Iterator<String> idIter =
+ ((GarbageCollectableBlobStore) s.getBlobStore())
+ .resolveChunks(b.toString());
+ while (idIter.hasNext()) {
+ String chunk = idIter.next();
+ blobSet.add(chunk);
+ }
+ }
+ log.info("{} Additional created {}", blobSet.size(), blobSet);
+
+ s.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ return blobSet;
+ }
+ }
}