You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/08/06 05:36:20 UTC
svn commit: r1694393 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/blob/
test/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/plugins/segment/
Author: amitj
Date: Thu Aug 6 03:36:20 2015
New Revision: 1694393
URL: http://svn.apache.org/r1694393
Log:
OAK-3174: [Blob GC] Make actual deletion of blobs synchronous
Removed the concurrent requests to a synchronous request for deletion of blobs from the blob store
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Thu Aug 6 03:36:20 2015
@@ -316,13 +316,13 @@ public class MarkSweepGarbageCollector i
if (ids.size() > getBatchCount()) {
count += ids.size();
- executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime));
+ sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
ids = Lists.newArrayList();
}
}
if (!ids.isEmpty()) {
count += ids.size();
- executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime));
+ sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
}
count -= exceptionQueue.size();
@@ -367,41 +367,26 @@ public class MarkSweepGarbageCollector i
ids.clear();
writer.flush();
}
-
+
/**
- * Sweeper thread.
+ * Deletes a batch of blobs from blob store.
+ *
+ * @param ids
+ * @param exceptionQueue
+ * @param maxModified
*/
- class Sweeper implements Runnable {
-
- /** The exception queue. */
- private final ConcurrentLinkedQueue<String> exceptionQueue;
-
- /** The ids to sweep. */
- private final List<String> ids;
-
- private final long maxModified;
-
- public Sweeper(List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue,
- long maxModified) {
- this.exceptionQueue = exceptionQueue;
- this.ids = ids;
- this.maxModified = maxModified;
- }
-
- @Override
- public void run() {
- try {
- LOG.debug("Blob ids to be deleted {}", ids);
- boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified));
- if (!deleted) {
- // Only log and do not add to exception queue since some blobs may not match the
- // lastMaxModifiedTime criteria.
- LOG.debug("Some blobs were not deleted from the batch : [{}]", ids);
- }
- } catch (Exception e) {
- LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
- exceptionQueue.addAll(ids);
+ private void sweepInternal(List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue, long maxModified) {
+ try {
+ LOG.debug("Blob ids to be deleted {}", ids);
+ boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified));
+ if (!deleted) {
+ // Only log and do not add to exception queue since some blobs may not match the
+ // lastMaxModifiedTime criteria.
+ LOG.debug("Some blobs were not deleted from the batch : [{}]", ids);
}
+ } catch (Exception e) {
+ LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
+ exceptionQueue.addAll(ids);
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Thu Aug 6 03:36:20 2015
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.document;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -26,6 +27,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
@@ -158,16 +161,16 @@ public class MongoBlobGCTest extends Abs
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new DocumentBlobReferenceRetriever(store),
- (GarbageCollectableBlobStore) store.getBlobStore(),
- MoreExecutors.sameThreadExecutor(),
- "./target", 5, 0, repoId);
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 0, repoId);
Thread.sleep(4000);
gc.collectGarbage(false);
-
+
+ assertEquals(0, executor.getTaskCount());
Set<String> existingAfterGC = iterate();
- boolean empty = Sets.symmetricDifference(remaining, existingAfterGC).isEmpty();
+ boolean empty = Sets.symmetricDifference(remaining, existingAfterGC).isEmpty();
assertTrue(empty);
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Thu Aug 6 03:36:20 2015
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
@@ -33,6 +34,8 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@@ -203,14 +206,15 @@ public class SegmentDataStoreBlobGCTest
new ByteArrayInputStream(new byte[0]),
REPOSITORY.getNameFromId(repoId));
}
-
+
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store.getTracker()),
- (GarbageCollectableBlobStore) store.getBlobStore(),
- MoreExecutors.sameThreadExecutor(),
+ (GarbageCollectableBlobStore) store.getBlobStore(), executor,
"./target", 2048, 0, repoId);
gc.collectGarbage(false);
+ assertEquals(0, executor.getTaskCount());
Set<String> existingAfterGC = iterate();
log.info("{} blobs that should have remained after gc : {}", remaining.size(), remaining);
log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC);