You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/01 11:39:32 UTC

svn commit: r1583583 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/blob/ main/java/org/apache/jackrabbit/oak/plugins/document/ main/java/org/apache/jackrabbit/oak/plugins/segment/ test/java/org/apache/jackrabbit/o...

Author: chetanm
Date: Tue Apr  1 09:39:31 2014
New Revision: 1583583

URL: http://svn.apache.org/r1583583
Log:
OAK-1639 - MarkSweepGarbageCollector improvements

Refactoring the MarkSweepGarbageCollector.
-- Replaced creation of internal executor with explicitly passed executor
-- Document/Segment store would create new instance of
    MarkSweepGarbageCollector for every invocation

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/GarbageCollectorFileState.java Tue Apr  1 09:39:31 2014
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.blob;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Comparator;
@@ -31,55 +32,36 @@ import org.apache.jackrabbit.oak.commons
  * Also, manages any temporary files needed as well as external sorting.
  * 
  */
-class GarbageCollectorFileState {
-
-    private static final String GC_DIR = "gc";
-
-    private static final String MARKED_PREFIX = "marked";
-
-    private static final String AVAIL_PREFIX = "avail";
-
-    private static final String GC_CANDIDATE_PREFIX = "gccand";
-
-    private static final String GC_PREFIX = "gc";
-
-    /** The startTime which records the starting time. */
-    private long startTime;
-
+class GarbageCollectorFileState implements Closeable{
     /** The root of the gc file state directory. */
-    private File home;
+    private final File home;
 
     /** The marked references. */
-    private File markedRefs;
+    private final File markedRefs;
 
     /** The available references. */
-    private File availableRefs;
+    private final File availableRefs;
 
     /** The gc candidates. */
-    private File gcCandidates;
+    private final File gcCandidates;
 
     /** The garbage stores the garbage collection candidates which were not deleted . */
-    private File garbage;
+    private final File garbage;
 
     /**
      * Instantiates a new garbage collector file state.
      * 
-     * @param root
-     *            the root
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
+     * @param root path of the root directory under which the
+     *             files created during gc are stored
      */
     public GarbageCollectorFileState(String root) throws IOException {
-        init(root);
-    }
-
-    /**
-     * Gets the home directory.
-     * 
-     * @return the home
-     */
-    protected File getHome() {
-        return home;
+        long startTime = System.currentTimeMillis();
+        home = new File(root, "gcworkdir-" + startTime);
+        markedRefs = new File(home, "marked-" + startTime);
+        availableRefs = new File(home,"avail-" + startTime);
+        gcCandidates = new File(home, "gccand-" + startTime);
+        garbage = new File(home, "gc-" + startTime);
+        FileUtils.forceMkdir(home);
     }
 
     /**
@@ -87,8 +69,8 @@ class GarbageCollectorFileState {
      * 
      * @return the marked references
      */
-    protected File getMarkedRefs() {
-        return createMarkedRefsFile();
+    public File getMarkedRefs() {
+        return markedRefs;
     }
 
     /**
@@ -96,8 +78,8 @@ class GarbageCollectorFileState {
      * 
      * @return the available references
      */
-    protected File getAvailableRefs() {
-        return createAvailableRefsFile();
+    public File getAvailableRefs() {
+        return availableRefs;
     }
 
     /**
@@ -105,8 +87,8 @@ class GarbageCollectorFileState {
      * 
      * @return the gc candidates
      */
-    protected File getGcCandidates() {
-        return createGcCandidatesFile();
+    public File getGcCandidates() {
+        return gcCandidates;
     }
 
     /**
@@ -114,100 +96,17 @@ class GarbageCollectorFileState {
      * 
      * @return the garbage
      */
-    protected File getGarbage() {
-        return createGarbageFile();
-    }
-
-    /**
-     * Initialize the state.
-     * 
-     * @param root
-     *            the root
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
-     */
-    private void init(String root) throws IOException {
-        startTime = System.currentTimeMillis();
-
-        home = new File(root, GC_DIR);
-        FileUtils.forceMkdir(home);
-        home.deleteOnExit();
-    }
-
-    /**
-     * Creates the marked references file.
-     * 
-     * @return the file
-     */
-    private File createMarkedRefsFile() {
-        if (markedRefs == null) {
-            markedRefs = new File(home,
-                    MARKED_PREFIX + "-" + startTime);
-            markedRefs.deleteOnExit();
-        }
-        return markedRefs;
-    }
-
-    /**
-     * Creates the available references file.
-     * 
-     * @return the file
-     */
-    private File createAvailableRefsFile() {
-        if (availableRefs == null) {
-            availableRefs = new File(home,
-                    AVAIL_PREFIX + "-" + startTime);
-            availableRefs.deleteOnExit();
-        }
-        return availableRefs;
-    }
-
-    /**
-     * Creates the gc candidates file.
-     * 
-     * @return the file
-     */
-    private File createGcCandidatesFile() {
-        if (gcCandidates == null) {
-            gcCandidates = new File(home,
-                    GC_CANDIDATE_PREFIX + "-" + startTime);
-            gcCandidates.deleteOnExit();
-        }
-        return gcCandidates;
-    }
-
-    /**
-     * Creates the garbage file.
-     * 
-     * @return the file
-     */
-    private File createGarbageFile() {
-        if (garbage == null) {
-            garbage = new File(home,
-                    GC_PREFIX + "-" + startTime);
-            garbage.deleteOnExit();
-        }
+    public File getGarbage() {
         return garbage;
     }
 
     /**
-     * Creates a temp file.
-     * 
-     * @return the file
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
-     */
-    protected File createTempFile() throws IOException {
-        return File.createTempFile("temp", null, home);
-    }
-
-    /**
      * Completes the process by deleting the files.
      * 
      * @throws IOException
      *             Signals that an I/O exception has occurred.
      */
-    protected void complete() throws IOException {
+    public void close() throws IOException {
         if (!getGarbage().exists() ||
                 FileUtils.sizeOf(getGarbage()) == 0) {
             FileUtils.deleteDirectory(home);
@@ -217,10 +116,7 @@ class GarbageCollectorFileState {
     /**
      * Sorts the given file externally.
      * 
-     * @param file
-     *            the file
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
+     * @param file file whose contents needs to be sorted
      */
     public void sort(File file) throws IOException {
         File sorted = createTempFile();
@@ -235,4 +131,8 @@ class GarbageCollectorFileState {
                 sorted, lexComparator, true);
         Files.move(sorted, file);
     }
+
+    private File createTempFile() throws IOException {
+        return File.createTempFile("temp", null, home);
+    }
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java Tue Apr  1 09:39:31 2014
@@ -26,23 +26,22 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.base.StandardSystemProperty;
 import com.google.common.base.Stopwatch;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
 import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.jackrabbit.oak.commons.IOUtils;
@@ -54,6 +53,7 @@ import org.slf4j.LoggerFactory;
  * Mark and sweep garbage collector.
  * 
  * Uses the file system to store internal state while in process to account for huge data.
+ * This class is not thread safe.
  * 
  */
 public class MarkSweepGarbageCollector implements BlobGarbageCollector {
@@ -66,127 +66,58 @@ public class MarkSweepGarbageCollector i
 
     public static final int DEFAULT_BATCH_COUNT = 2048;
 
-    public static final String NOT_RUNNING = "NotRunning";
-
-    public static final String MARKING = "Running-Marking";
-
-    public static final String SWEEPING = "Running-Sweeping";
+    public static enum State {NOT_RUNNING, MARKING, SWEEPING}
 
     /** The last modified time before current time of blobs to consider for garbage collection. */
-    private long maxLastModifiedInterval = TimeUnit.HOURS.toMillis(24);
+    private final long maxLastModifiedInterval;
 
     /** Run concurrently when possible. */
-    private boolean runConcurrently = true;
-
-    /** The number of sweeper threads to use. */
-    private int numSweepers = 1;
+    private final boolean runConcurrently;
 
     /** The blob store to be garbage collected. */
-    private GarbageCollectableBlobStore blobStore;
+    private final GarbageCollectableBlobStore blobStore;
 
     /** Helper class to mark blob references which **/
-    private BlobReferenceRetriever marker;
+    private final BlobReferenceRetriever marker;
     
     /** The garbage collector file state */
-    private GarbageCollectorFileState fs;
+    private final GarbageCollectorFileState fs;
 
-    /** The configured root to store gc process files. */
-    private String root = TEMP_DIR;
+    private final Executor executor;
 
     /** The batch count. */
-    private int batchCount = DEFAULT_BATCH_COUNT;
+    private final int batchCount;
 
     /** Flag to indicate the state of the gc **/
-    private String state = NOT_RUNNING;
-
-    /**
-     * Gets the max last modified interval considered for garbage collection.
-     * 
-     * @return the max last modified interval
-     */
-    protected long getMaxLastModifiedInterval() {
-        return maxLastModifiedInterval;
-    }
+    private State state = State.NOT_RUNNING;
 
     /**
-     * Sets the max last modified interval considered for garbage collection.
-     * 
-     * @param maxLastModifiedInterval the new max last modified interval
-     */
-    protected void setMaxLastModifiedInterval(long maxLastModifiedInterval) {
-        this.maxLastModifiedInterval = maxLastModifiedInterval;
-    }
-    
-    /**
-     * Gets the root.
-     * 
-     * @return the root
-     */
-    protected String getRoot() {
-        return root;
-    }
-
-    /**
-     * Gets the batch count.
-     * 
-     * @return the batch count
-     */
-    protected int getBatchCount() {
-        return batchCount;
-    }
-
-    /**
-     * Checks if run concurrently.
-     * 
-     * @return true, if is run concurrently
-     */
-    protected boolean isRunConcurrently() {
-        return runConcurrently;
-    }
-
-    /**
-     * Gets the number sweepers.
-     * 
-     * @return the number sweepers
-     */
-    protected int getNumSweepers() {
-        return numSweepers;
-    }
-
-    /**
-     * Gets the state of the gc process.
-     * 
-     * @return the state
-     */
-    protected String getState() {
-        return state;
-    }
-
-    /**
-     * @param marker
+     * Creates an instance of MarkSweepGarbageCollector
+     *
+     * @param marker BlobReferenceRetriever instanced used to fetch refereedd blob entries
      * @param blobStore
-     * @param root the root
-     * @param batchCount the batch count
+     * @param root the root absolute path of directory under which temporary
+     *             files would be created
+     * @param batchCount batch sized used for saving intermediate state
      * @param runBackendConcurrently - run the backend iterate concurrently
-     * @param maxSweeperThreads the max sweeper threads
-     * @param maxLastModifiedInterval
+     * @param maxLastModifiedInterval - lastModifiedTime in millis. Only files with time
+     *                                less than this time would be considered for GC
      * @throws IOException Signals that an I/O exception has occurred.
      */
     public MarkSweepGarbageCollector(
             BlobReferenceRetriever marker,
             GarbageCollectableBlobStore blobStore,
+            Executor executor,
             String root,
             int batchCount,
             boolean runBackendConcurrently,
-            int maxSweeperThreads,
             long maxLastModifiedInterval)
             throws IOException {
+        this.executor = executor;
         this.blobStore = blobStore;
         this.marker = marker;
         this.batchCount = batchCount;
-        this.root = root;
         this.runConcurrently = runBackendConcurrently;
-        this.numSweepers = maxSweeperThreads;
         this.maxLastModifiedInterval = maxLastModifiedInterval;
         fs = new GarbageCollectorFileState(root);        
     }
@@ -200,13 +131,10 @@ public class MarkSweepGarbageCollector i
      */
     public MarkSweepGarbageCollector(
             BlobReferenceRetriever marker, 
-            GarbageCollectableBlobStore blobStore)
+            GarbageCollectableBlobStore blobStore,
+            Executor executor)
             throws IOException {
-        Preconditions.checkState(!Strings.isNullOrEmpty(root));
-
-        this.blobStore = blobStore;
-        this.marker = marker;
-        fs = new GarbageCollectorFileState(root);
+        this(marker, blobStore, executor, TEMP_DIR, DEFAULT_BATCH_COUNT, true, TimeUnit.HOURS.toMillis(24));
     }
 
     @Override
@@ -215,56 +143,63 @@ public class MarkSweepGarbageCollector i
     }
 
     /**
+     * Gets the state of the gc process.
+     *
+     * @return the state
+     */
+    public State getState() {
+        return state;
+    }
+
+    /**
      * Mark and sweep. Main method for GC.
      * 
      * @throws Exception
      *             the exception
      */
-    public void markAndSweep() throws Exception {
+    private void markAndSweep() throws IOException, InterruptedException {
+        boolean threw = true;
         try {
             Stopwatch sw = Stopwatch.createStarted();
             LOG.info("Starting Blob garbage collection");
 
             mark();
             int deleteCount = sweep();
+            threw = false;
 
             LOG.info("Blob garbage collection completed in {}. Number of blobs " +
                     "deleted [{}]", sw.toString(), deleteCount);
         } finally {
-            fs.complete();
-            state = NOT_RUNNING;
+            Closeables.close(fs, threw);
+            state = State.NOT_RUNNING;
         }
     }
 
     /**
      * Mark phase of the GC.
-     * 
-     * @throws Exception
-     *             the exception
      */
-    private void mark() throws Exception {
-        state = MARKING;
+    private void mark() throws IOException, InterruptedException {
+        state = State.MARKING;
         LOG.debug("Starting mark phase of the garbage collector");
 
         // Find all blobs available in the blob store
-        Thread blobIdRetrieverThread = null;
+        ListenableFutureTask<Integer> blobIdRetriever = ListenableFutureTask.create(new BlobIdRetriever());
         if (runConcurrently) {
-            blobIdRetrieverThread = new Thread(new BlobIdRetriever(), 
-                    this.getClass().getSimpleName() + "-MarkThread");
-            blobIdRetrieverThread.setDaemon(true);
-            blobIdRetrieverThread.start();
+            executor.execute(blobIdRetriever);
         } else {
-            (new BlobIdRetriever()).retrieve();
+            MoreExecutors.sameThreadExecutor().execute(blobIdRetriever);
         }
 
         // Find all blob references after iterating over the whole repository
         iterateNodeTree();
 
-        if (runConcurrently) {
-            if (blobIdRetrieverThread.isAlive()) {
-                blobIdRetrieverThread.join();
-            }
+        try {
+            blobIdRetriever.get();
+        } catch (ExecutionException e) {
+           LOG.warn("Error occurred while fetching all the blobIds from the BlobStore. GC would " +
+                   "continue with the blobIds retrieved so far", e.getCause());
         }
+
         difference();
         LOG.debug("Ending mark phase of the garbage collector");
     }
@@ -278,9 +213,9 @@ public class MarkSweepGarbageCollector i
     private void difference() throws IOException {
         LOG.debug("Starting difference phase of the garbage collector");
 
-        FileLineDifferenceIterator<String> iter = new FileLineDifferenceIterator<String>(
+        FileLineDifferenceIterator iter = new FileLineDifferenceIterator(
                 fs.getMarkedRefs(),
-                fs.getAvailableRefs());
+                fs.getAvailableRefs(), batchCount);
 
         BufferedWriter bufferWriter = null;
         try {
@@ -316,87 +251,54 @@ public class MarkSweepGarbageCollector i
      */
     private int sweep() throws IOException {
         int count = 0;
-        try {        
-            state = SWEEPING;        
-            LOG.debug("Starting sweep phase of the garbage collector");
-    
-            ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
-            ExecutorService executorService =
-                    new ThreadPoolExecutor(getNumSweepers(), getNumSweepers(), 1,
-                            TimeUnit.MINUTES,
-                            new LinkedBlockingQueue<Runnable>(),
-                            new ThreadFactory() {
-                                private final AtomicInteger threadCounter = new AtomicInteger();
-    
-                                private String getName() {
-                                    return "MarkSweepGarbageCollector-Sweeper-" + threadCounter.getAndIncrement();
-                                }
-    
-                                @Override
-                                public Thread newThread(Runnable r) {
-                                    Thread thread = new Thread(r, getName());
-                                    thread.setDaemon(true);
-                                    return thread;
-                                }
-                            });
-    
-            LineIterator iterator = 
-                    FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name());
-            List<String> ids = Lists.newArrayList();
+        state = State.SWEEPING;
+        LOG.debug("Starting sweep phase of the garbage collector");
 
-            while (iterator.hasNext()) {
-                ids.add(iterator.next());
-    
-                if (ids.size() > getBatchCount()) {
-                    count += ids.size();
-                    executorService.execute(new Sweeper(ids, exceptionQueue));
-                    ids = Lists.newArrayList();
-                }
-            }
-            if (!ids.isEmpty()) {
+        ConcurrentLinkedQueue<String> exceptionQueue = new ConcurrentLinkedQueue<String>();
+
+        LineIterator iterator =
+                FileUtils.lineIterator(fs.getGcCandidates(), Charsets.UTF_8.name());
+        List<String> ids = Lists.newArrayList();
+
+        while (iterator.hasNext()) {
+            ids.add(iterator.next());
+
+            if (ids.size() > getBatchCount()) {
                 count += ids.size();
-                executorService.execute(new Sweeper(ids, exceptionQueue));
+                executor.execute(new Sweeper(ids, exceptionQueue));
+                ids = Lists.newArrayList();
             }
-    
-            try {
-                executorService.shutdown();
-                executorService.awaitTermination(100, TimeUnit.MINUTES);
-            } catch (InterruptedException e) {
-                LOG.error("Exception while waiting for termination of the executor service. ExecutorService " +
-                        "would be immediately shutdown", e);
-                executorService.shutdownNow();
-                Thread.currentThread().interrupt();
-            }
-    
-            count -= exceptionQueue.size();
-            BufferedWriter writer = null;
-            try {
-                if (!exceptionQueue.isEmpty()) {
-                    writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
-                    saveBatchToFile(Lists.newArrayList(exceptionQueue), writer);
-                }
-            } finally {
-                LineIterator.closeQuietly(iterator);
-                IOUtils.closeQuietly(writer);
+        }
+        if (!ids.isEmpty()) {
+            count += ids.size();
+            executor.execute(new Sweeper(ids, exceptionQueue));
+        }
+
+        count -= exceptionQueue.size();
+        BufferedWriter writer = null;
+        try {
+            if (!exceptionQueue.isEmpty()) {
+                writer = Files.newWriter(fs.getGarbage(), Charsets.UTF_8);
+                saveBatchToFile(Lists.newArrayList(exceptionQueue), writer);
             }
-    
-            LOG.debug("Ending sweep phase of the garbage collector");
         } finally {
-            fs.complete();
-            state = NOT_RUNNING;
+            LineIterator.closeQuietly(iterator);
+            IOUtils.closeQuietly(writer);
+        }
+        if(!exceptionQueue.isEmpty()) {
+            LOG.warn("Unable to delete some blob entries from the blob store. Details around such blob entries " +
+                    "can be found in [{}]", fs.getGarbage().getAbsolutePath());
         }
+        LOG.debug("Ending sweep phase of the garbage collector");
         return count;
     }
 
+    private int getBatchCount() {
+        return batchCount;
+    }
+
     /**
      * Save batch to file.
-     * 
-     * @param ids
-     *            the ids
-     * @param writer
-     *            the writer
-     * @throws IOException
-     *             Signals that an I/O exception has occurred.
      */
     static void saveBatchToFile(List<String> ids, BufferedWriter writer) throws IOException {
         writer.append(Joiner.on(NEWLINE).join(ids));
@@ -416,14 +318,6 @@ public class MarkSweepGarbageCollector i
         /** The ids to sweep. */
         private final List<String> ids;
 
-        /**
-         * Instantiates a new sweeper.
-         * 
-         * @param ids
-         *            the ids
-         * @param exceptionQueue
-         *            the exception queue
-         */
         public Sweeper(List<String> ids, ConcurrentLinkedQueue<String> exceptionQueue) {
             this.exceptionQueue = exceptionQueue;
             this.ids = ids;
@@ -441,30 +335,25 @@ public class MarkSweepGarbageCollector i
                     exceptionQueue.addAll(ids);
                 }
             } catch (Exception e) {
-                LOG.error("Error occurred while deleting blob with ids [{}]", ids, e);
+                LOG.warn("Error occurred while deleting blob with ids [{}]", ids, e);
                 exceptionQueue.addAll(ids);
             }
         }
     }
 
     /**
-     * Iterates the complete node tree.
-     * 
-     * @return the list
-     * @throws Exception
-     *             the exception
+     * Iterates the complete node tree and collect all blob references
      */
-    private void iterateNodeTree() throws Exception {
+    private void iterateNodeTree() throws IOException {
         final BufferedWriter writer = Files.newWriter(fs.getMarkedRefs(), Charsets.UTF_8);
         try {
             marker.collectReferences(
                     new ReferenceCollector() {
-                        private final List<String> idBatch = Lists
-                                .newArrayListWithCapacity(getBatchCount());
+                        private final List<String> idBatch = Lists.newArrayListWithCapacity(getBatchCount());
 
                         private int count = 0;
 
-                        private boolean debugMode = LOG.isTraceEnabled();
+                        private final boolean debugMode = LOG.isTraceEnabled();
 
                         @Override
                         public void addReference(String blobId) {
@@ -514,26 +403,19 @@ public class MarkSweepGarbageCollector i
     /**
      * BlobIdRetriever class to retrieve all blob ids.
      */
-    class BlobIdRetriever implements Runnable {
+    private class BlobIdRetriever implements Callable<Integer> {
         @Override
-        public void run() {
-            retrieve();
-        }
-
-        /**
-         * Retrieve.
-         */
-        protected void retrieve() {
+        public Integer call() throws Exception {
             LOG.debug("Starting retrieve of all blobs");
-
             BufferedWriter bufferWriter = null;
+            int blobsCount = 0;
             try {
                 bufferWriter = new BufferedWriter(
                         new FileWriter(fs.getAvailableRefs()));
                 Iterator<String> idsIter = blobStore.getAllChunkIds(maxLastModifiedInterval);
                 
                 List<String> ids = Lists.newArrayList();
-                int blobsCount = 0;
+
                 while (idsIter.hasNext()) {
                     ids.add(idsIter.next());
                     if (ids.size() > getBatchCount()) {
@@ -550,21 +432,19 @@ public class MarkSweepGarbageCollector i
                 // sort the file
                 fs.sort(fs.getAvailableRefs());
                 LOG.debug("Ending retrieving all blobs : {}", blobsCount);
-            } catch (Exception e) {
-                LOG.error("Error retrieving available blob ids", e);
             } finally {
                 IOUtils.closeQuietly(bufferWriter);
             }
+            return blobsCount;
         }
+
+
     }
 
     /**
      * FileLineDifferenceIterator class which iterates over the difference of 2 files line by line.
-     * 
-     * @param <T>
-     *            the generic type
      */
-    class FileLineDifferenceIterator<T> implements Iterator<String> {
+    static class FileLineDifferenceIterator implements Iterator<String> {
 
         /** The marked references iterator. */
         private final LineIterator markedIter;
@@ -574,6 +454,8 @@ public class MarkSweepGarbageCollector i
 
         private final ArrayDeque<String> queue;
 
+        private final int batchSize;
+
         private boolean done;
 
         /** Temporary buffer. */
@@ -581,19 +463,14 @@ public class MarkSweepGarbageCollector i
 
         /**
          * Instantiates a new file line difference iterator.
-         * 
-         * @param marked
-         *            the marked
-         * @param available
-         *            the available
-         * @throws IOException
-         *             Signals that an I/O exception has occurred.
          */
-        public FileLineDifferenceIterator(File marked, File available) throws IOException {
+        public FileLineDifferenceIterator(File marked, File available, int batchSize) throws IOException {
             this.markedIter = FileUtils.lineIterator(marked);
             this.allIter = FileUtils.lineIterator(available);
-            queue = new ArrayDeque<String>(getBatchCount());
+            this.batchSize = batchSize;
+            queue = new ArrayDeque<String>(batchSize);
             markedBuffer = Sets.newTreeSet();
+
         }
 
         /**
@@ -658,16 +535,16 @@ public class MarkSweepGarbageCollector i
             // the
             // blob id set iteration is complete
             while (allIter.hasNext() &&
-                    gcSet.size() < getBatchCount()) {
+                    gcSet.size() < batchSize) {
                 TreeSet<String> allBuffer = new TreeSet<String>();
 
                 while (markedIter.hasNext() &&
-                        markedBuffer.size() < getBatchCount()) {
+                        markedBuffer.size() < batchSize) {
                     String stre = markedIter.next();
                     markedBuffer.add(stre);
                 }
                 while (allIter.hasNext() &&
-                        allBuffer.size() < getBatchCount()) {
+                        allBuffer.size() < batchSize) {
                     String stre = allIter.next();
                     allBuffer.add(stre);
                 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Tue Apr  1 09:39:31 2014
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.document;
 
 import java.io.InputStream;
+import java.util.concurrent.Executor;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -25,6 +26,7 @@ import javax.sql.DataSource;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.mongodb.DB;
 
 import org.apache.jackrabbit.mk.api.MicroKernel;
@@ -462,6 +464,7 @@ public class DocumentMK implements Micro
         private long splitDocumentAgeMillis = 5 * 60 * 1000;
         private long offHeapCacheSize = -1;
         private Clock clock = Clock.SIMPLE;
+        private Executor executor;
 
         public Builder() {
             memoryCacheSize(DEFAULT_MEMORY_CACHE_SIZE);
@@ -731,6 +734,18 @@ public class DocumentMK implements Micro
             return this;
         }
 
+        public Executor getExecutor() {
+            if(executor == null){
+                return MoreExecutors.sameThreadExecutor();
+            }
+            return executor;
+        }
+
+        public Builder setExecutor(Executor executor){
+            this.executor = executor;
+            return this;
+        }
+
         public Builder clock(Clock clock) {
             this.clock = clock;
             return this;

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Apr  1 09:39:31 2014
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -288,7 +289,7 @@ public final class DocumentNodeStore
 
     private final VersionGarbageCollector versionGarbageCollector;
 
-    private final MarkSweepGarbageCollector blobGarbageCollector;
+    private final Executor executor;
 
     public DocumentNodeStore(DocumentMK.Builder builder) {
         this.blobStore = builder.getBlobStore();
@@ -303,6 +304,7 @@ public final class DocumentNodeStore
             s = new LoggingDocumentStoreWrapper(s);
         }
         this.store = s;
+        this.executor = builder.getExecutor();
         this.clock = builder.getClock();
         int cid = builder.getClusterId();
         cid = Integer.getInteger("oak.documentMK.clusterId", cid);
@@ -320,7 +322,6 @@ public final class DocumentNodeStore
         this.branches = new UnmergedBranches(getRevisionComparator());
         this.asyncDelay = builder.getAsyncDelay();
         this.versionGarbageCollector = new VersionGarbageCollector(this);
-        this.blobGarbageCollector = createBlobGarbageCollector(blobStore);
         this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
             @Override
             public int getMemory() {
@@ -1607,19 +1608,17 @@ public final class DocumentNodeStore
      * Creates and returns a MarkSweepGarbageCollector if the current BlobStore
      * supports garbage collection
      *
-     * @param blobStore the blobStore used by DocumentNodeStore
-     *
      * @return garbage collector of the BlobStore supports GC otherwise null
      */
-    @Nullable
-    private MarkSweepGarbageCollector createBlobGarbageCollector(BlobStore blobStore) {
+    @CheckForNull
+    public MarkSweepGarbageCollector createBlobGarbageCollector() {
         MarkSweepGarbageCollector blobGC = null;
         if(blobStore instanceof GarbageCollectableBlobStore){
-            blobGC = null;
             try {
                 blobGC = new MarkSweepGarbageCollector(
                         new DocumentBlobReferenceRetriever(this),
-                            (GarbageCollectableBlobStore) blobStore);
+                            (GarbageCollectableBlobStore) blobStore,
+                        executor);
             } catch (IOException e) {
                 throw new RuntimeException("Error occurred while initializing " +
                         "the MarkSweepGarbageCollector",e);
@@ -1708,9 +1707,4 @@ public final class DocumentNodeStore
     public VersionGarbageCollector getVersionGarbageCollector() {
         return versionGarbageCollector;
     }
-
-    @CheckForNull
-    public MarkSweepGarbageCollector getBlobGarbageCollector() {
-        return blobGarbageCollector;
-    }
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Tue Apr  1 09:39:31 2014
@@ -48,17 +48,17 @@ import org.apache.jackrabbit.oak.osgi.Ob
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
-import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.document.cache.CachingDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.state.RevisionGC;
 import org.apache.jackrabbit.oak.spi.state.RevisionGCMBean;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
-import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.ComponentContext;
@@ -128,6 +128,7 @@ public class DocumentNodeStoreService {
     private DocumentMK mk;
     private ObserverTracker observerTracker;
     private ComponentContext context;
+    private Whiteboard whiteboard;
 
 
     private static final long DEFAULT_VER_GC_MAX_AGE = TimeUnit.DAYS.toSeconds(1);
@@ -140,6 +141,9 @@ public class DocumentNodeStoreService {
     @Activate
     protected void activate(ComponentContext context, Map<String, ?> config) throws Exception {
         this.context = context;
+        this.whiteboard = new OsgiWhiteboard(context.getBundleContext());
+        this.executor = new WhiteboardExecutor();
+        executor.start(whiteboard);
 
         if (blobStore == null &&
                 PropertiesUtil.toBoolean(prop(CUSTOM_BLOB_STORE), false)) {
@@ -191,12 +195,12 @@ public class DocumentNodeStoreService {
         }
 
         mkBuilder.setMongoDB(mongoDB, changesSize);
-
+        mkBuilder.setExecutor(executor);
         mk = mkBuilder.open();
 
         log.info("Connected to database {}", mongoDB);
 
-        registerJMXBeans(mk.getNodeStore(), context.getBundleContext());
+        registerJMXBeans(mk.getNodeStore());
 
         NodeStore store;
         if (useMK) {
@@ -234,12 +238,14 @@ public class DocumentNodeStoreService {
         unregisterNodeStore();
     }
 
+    @SuppressWarnings("UnusedDeclaration")
     protected void bindBlobStore(BlobStore blobStore) throws IOException {
         log.info("Initializing DocumentNodeStore with BlobStore [{}]", blobStore);
         this.blobStore = blobStore;
         registerNodeStore();
     }
 
+    @SuppressWarnings("UnusedDeclaration")
     protected void unbindBlobStore(BlobStore blobStore) {
         this.blobStore = null;
         unregisterNodeStore();
@@ -264,23 +270,22 @@ public class DocumentNodeStoreService {
         }
     }
 
-    private void registerJMXBeans(final DocumentNodeStore store, BundleContext context) throws IOException {
-        Whiteboard wb = new OsgiWhiteboard(context);
+    private void registerJMXBeans(final DocumentNodeStore store) throws IOException {
         registrations.add(
-                registerMBean(wb,
+                registerMBean(whiteboard,
                         CacheStatsMBean.class,
                         store.getNodeCacheStats(),
                         CacheStatsMBean.TYPE,
                         store.getNodeCacheStats().getName()));
         registrations.add(
-                registerMBean(wb,
+                registerMBean(whiteboard,
                         CacheStatsMBean.class,
                         store.getNodeChildrenCacheStats(),
                         CacheStatsMBean.TYPE,
                         store.getNodeChildrenCacheStats().getName())
         );
         registrations.add(
-                registerMBean(wb,
+                registerMBean(whiteboard,
                         CacheStatsMBean.class,
                         store.getDocChildrenCacheStats(),
                         CacheStatsMBean.TYPE,
@@ -290,7 +295,7 @@ public class DocumentNodeStoreService {
         if (cl instanceof MemoryDiffCache) {
             MemoryDiffCache mcl = (MemoryDiffCache) cl;
             registrations.add(
-                    registerMBean(wb,
+                    registerMBean(whiteboard,
                             CacheStatsMBean.class,
                             mcl.getDiffCacheStats(),
                             CacheStatsMBean.TYPE,
@@ -302,7 +307,7 @@ public class DocumentNodeStoreService {
         if (ds instanceof CachingDocumentStore) {
             CachingDocumentStore cds = (CachingDocumentStore) ds;
             registrations.add(
-                    registerMBean(wb,
+                    registerMBean(whiteboard,
                             CacheStatsMBean.class,
                             cds.getCacheStats(),
                             CacheStatsMBean.TYPE,
@@ -310,11 +315,16 @@ public class DocumentNodeStoreService {
             );
         }
 
-        executor = new WhiteboardExecutor();
-        executor.start(wb);
-        MarkSweepGarbageCollector gc = store.getBlobGarbageCollector();
-        if(gc != null){
-            registrations.add(registerMBean(wb, BlobGCMBean.class, new BlobGC(gc, executor),
+
+
+        if (blobStore instanceof GarbageCollectableBlobStore) {
+            BlobGarbageCollector gc = new BlobGarbageCollector() {
+                @Override
+                public void collectGarbage() throws Exception {
+                    store.createBlobGarbageCollector().collectGarbage();
+                }
+            };
+            registrations.add(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
                     BlobGCMBean.TYPE, "Document node store blob garbage collection"));
         }
 
@@ -324,7 +334,7 @@ public class DocumentNodeStoreService {
                 store.getVersionGarbageCollector().gc(versionGcMaxAgeInSecs, TimeUnit.SECONDS);
             }
         }, executor);
-        registrations.add(registerMBean(wb, RevisionGCMBean.class, revisionGC,
+        registrations.add(registerMBean(whiteboard, RevisionGCMBean.class, revisionGC,
                 RevisionGCMBean.TYPE, "Document node store revision garbage collection"));
 
         //TODO Register JMX bean for Off Heap Cache stats

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Tue Apr  1 09:39:31 2014
@@ -38,6 +38,7 @@ import org.apache.jackrabbit.oak.osgi.Ob
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
+import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -174,9 +175,17 @@ public class SegmentNodeStoreService ext
                 RevisionGCMBean.TYPE, "Segment node store revision garbage collection");
 
         if (blobStore instanceof GarbageCollectableBlobStore) {
-            MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
-                    new SegmentBlobReferenceRetriever(store.getTracker()), 
-                        (GarbageCollectableBlobStore) blobStore);
+            BlobGarbageCollector gc = new BlobGarbageCollector() {
+                @Override
+                public void collectGarbage() throws Exception {
+                    MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
+                            new SegmentBlobReferenceRetriever(store.getTracker()),
+                            (GarbageCollectableBlobStore) blobStore,
+                            executor);
+                    gc.collectGarbage();
+                }
+            };
+
             blobGCRegistration = registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
                     BlobGCMBean.TYPE, "Segment node store blob garbage collection");
         }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java Tue Apr  1 09:39:31 2014
@@ -28,6 +28,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import junit.framework.Assert;
 
 import com.google.common.collect.Lists;
@@ -122,11 +123,13 @@ public class MongoBlobGCTest extends Abs
         gc(set);
     }
 
-    private void gc(HashSet<String> set) throws IOException, Exception {
+    private void gc(HashSet<String> set) throws Exception {
         DocumentNodeStore store = mk.getNodeStore();
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new DocumentBlobReferenceRetriever(store),
-                (GarbageCollectableBlobStore) store.getBlobStore(), "./target", 2048, true, 2, 0);
+                (GarbageCollectableBlobStore) store.getBlobStore(),
+                MoreExecutors.sameThreadExecutor(),
+                "./target", 2048, true, 0);
         gc.collectGarbage();
 
         Set<String> existing = iterate();

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java?rev=1583583&r1=1583582&r2=1583583&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/SegmentDataStoreBlobGCTest.java Tue Apr  1 09:39:31 2014
@@ -33,6 +33,7 @@ import java.util.Set;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.core.data.FileDataStore;
 import org.apache.jackrabbit.oak.api.Blob;
@@ -123,7 +124,9 @@ public class SegmentDataStoreBlobGCTest 
 
         MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(
                 new SegmentBlobReferenceRetriever(store.getTracker()),
-                    (GarbageCollectableBlobStore) store.getBlobStore(), "./target", 2048, true, 2, 0);
+                    (GarbageCollectableBlobStore) store.getBlobStore(),
+                    MoreExecutors.sameThreadExecutor(),
+                    "./target", 2048, true,  0);
         gc.collectGarbage();
 
         Set<String> existing = iterate();