You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2015/08/06 05:36:20 UTC

svn commit: r1694393 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/blob/ test/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: amitj
Date: Thu Aug  6 03:36:20 2015
New Revision: 1694393

URL: http://svn.apache.org/r1694393
Log:
OAK-3174: [Blob GC] Make actual deletion of blobs synchronous

Removed the concurrent requests to a synchronous request for deletion of blobs from the blob store

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Thu Aug  6 03:36:20 2015
@@ -316,13 +316,13 @@ public class MarkSweepGarbageCollector i
 
             if (ids.size() > getBatchCount()) {
                 count += ids.size();
-                executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime));
+                sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
                 ids = Lists.newArrayList();
             }
         }
         if (!ids.isEmpty()) {
             count += ids.size();
-            executor.execute(new Sweeper(ids, exceptionQueue, earliestRefAvailTime));
+            sweepInternal(ids, exceptionQueue, earliestRefAvailTime);
         }
 
         count -= exceptionQueue.size();
@@ -367,41 +367,26 @@ public class MarkSweepGarbageCollector i
         ids.clear();
         writer.flush();
     }
-
+    
     /**
-     * Sweeper thread.
+     * Deletes a batch of blobs from blob store.
+     * 
+     * @param ids
+     * @param exceptionQueue
+     * @param maxModified
      */
-    class Sweeper implements Runnable {
-
-        /** The exception queue. */
-        private final ConcurrentLinkedQueue<String> exceptionQueue;
-
-        /** The ids to sweep. */
-        private final List<String> ids;
-
-        private final long maxModified;
-
-        public Sweeper(List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue,
-                long maxModified) {
-            this.exceptionQueue = exceptionQueue;
-            this.ids = ids;
-            this.maxModified = maxModified;
-        }
-
-        @Override
-        public void run() {
-            try {
-                LOG.debug("Blob ids to be deleted {}", ids);
-                boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified));
-                if (!deleted) {
-                    // Only log and do not add to exception queue since some blobs may not match the
-                    // lastMaxModifiedTime criteria.
-                    LOG.debug("Some blobs were not deleted from the batch : [{}]", ids);
-                }
-            } catch (Exception e) {
-                LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
-                exceptionQueue.addAll(ids);
+    private void sweepInternal(List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue, long maxModified) {
+        try {
+            LOG.debug("Blob ids to be deleted {}", ids);
+            boolean deleted = blobStore.deleteChunks(ids, getLastMaxModifiedTime(maxModified));
+            if (!deleted) {
+                // Only log and do not add to exception queue since some blobs may not match the
+                // lastMaxModifiedTime criteria.
+                LOG.debug("Some blobs were not deleted from the batch : [{}]", ids);
             }
+        } catch (Exception e) {
+            LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
+            exceptionQueue.addAll(ids);
         }
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Thu Aug  6 03:36:20 2015
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -26,6 +27,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
@@ -158,16 +161,16 @@ public class MongoBlobGCTest extends Abs
                 new ByteArrayInputStream(new byte[0]),
                 REPOSITORY.getNameFromId(repoId));
         }
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new DocumentBlobReferenceRetriever(store),
-                (GarbageCollectableBlobStore) store.getBlobStore(),
-                MoreExecutors.sameThreadExecutor(),
-                "./target", 5, 0, repoId);
+                (GarbageCollectableBlobStore) store.getBlobStore(), executor, "./target", 5, 0, repoId);
         Thread.sleep(4000);
         gc.collectGarbage(false);
-
+        
+        assertEquals(0, executor.getTaskCount());
         Set<String> existingAfterGC = iterate();
-    boolean empty = Sets.symmetricDifference(remaining, existingAfterGC).isEmpty();
+        boolean empty = Sets.symmetricDifference(remaining, existingAfterGC).isEmpty();
         assertTrue(empty);
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1694393&r1=1694392&r2=1694393&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Thu Aug  6 03:36:20 2015
@@ -20,6 +20,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
 import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType.REPOSITORY;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
@@ -33,6 +34,8 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
@@ -203,14 +206,15 @@ public class SegmentDataStoreBlobGCTest
                 new ByteArrayInputStream(new byte[0]),
                 REPOSITORY.getNameFromId(repoId));
         }
-
+        
+        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new SegmentBlobReferenceRetriever(store.getTracker()),
-                    (GarbageCollectableBlobStore) store.getBlobStore(),
-                    MoreExecutors.sameThreadExecutor(),
+                    (GarbageCollectableBlobStore) store.getBlobStore(), executor,
                     "./target", 2048, 0, repoId);
         gc.collectGarbage(false);
 
+        assertEquals(0, executor.getTaskCount());
         Set<String> existingAfterGC = iterate();
         log.info("{} blobs that should have remained after gc : {}", remaining.size(), remaining);
         log.info("{} blobs existing after gc : {}", existingAfterGC.size(), existingAfterGC);