You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/10/10 15:10:06 UTC

svn commit: r1764116 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/ main/java/org/apache/jackrabbit/oak/segment/compaction/ main/java/org/apache/jackrabbit/oak/segment/file/ test/java/org/apache/jackrabbit/...

Author: mduerig
Date: Mon Oct 10 15:10:06 2016
New Revision: 1764116

URL: http://svn.apache.org/viewvc?rev=1764116&view=rev
Log:
OAK-4617: Align SegmentRevisionGC MBean with new generation based GC
Encapsulate gc functionality in a GarbageCollector subclass and expose a method for cancelling gc

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java Mon Oct 10 15:10:06 2016
@@ -397,7 +397,7 @@ public class SegmentNodeStoreService ext
         registrations.add(registerMBean(
                 whiteboard,
                 SegmentRevisionGC.class,
-                new SegmentRevisionGCMBean(gcOptions),
+                new SegmentRevisionGCMBean(store, gcOptions),
                 SegmentRevisionGC.TYPE,
                 "Segment node store gc options"
         ));

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java Mon Oct 10 15:10:06 2016
@@ -79,8 +79,6 @@ public class SegmentGCOptions {
             "oak.segment.compaction.gcSizeDeltaEstimation",
             SIZE_DELTA_ESTIMATION_DEFAULT);
 
-    private volatile boolean stopCompaction;
-
     public SegmentGCOptions(boolean paused, int gainThreshold, int retryCount, int forceTimeout) {
         this.paused = paused;
         this.gainThreshold = gainThreshold;
@@ -296,13 +294,4 @@ public class SegmentGCOptions {
         return this;
     }
 
-    public boolean isStopCompaction() {
-        return stopCompaction;
-    }
-
-    public boolean setStopCompaction(boolean stop) {
-        this.stopCompaction = stop;
-        return stop;
-    }
-
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java Mon Oct 10 15:10:06 2016
@@ -102,7 +102,13 @@ public interface SegmentRevisionGC {
     void setGcSizeDeltaEstimation(long gcSizeDeltaEstimation);
 
     /**
-     * Raise the flag to signal compaction to stop as soon as possible.
+     * Initiate a revision garbage collection operation
      */
-    void stopCompaction();
+    void startRevisionGC();
+
+    /**
+     * Cancel a running revision garbage collection operation. Does nothing
+     * if revision garbage collection is not running.
+     */
+    void cancelRevisionGC();
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java Mon Oct 10 15:10:06 2016
@@ -19,18 +19,30 @@
 
 package org.apache.jackrabbit.oak.segment.compaction;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
 import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
 
 // FIXME OAK-4617: Align SegmentRevisionGC MBean with new generation based GC
 public class SegmentRevisionGCMBean
         extends AnnotatedStandardMBean
         implements SegmentRevisionGC {
 
+    @Nonnull
+    private final FileStore fileStore;
+
+    @Nonnull
     private final SegmentGCOptions gcOptions;
 
-    public SegmentRevisionGCMBean(SegmentGCOptions gcOptions) {
+    public SegmentRevisionGCMBean(@Nonnull FileStore fileStore, @Nonnull SegmentGCOptions gcOptions) {
         super(SegmentRevisionGC.class);
-        this.gcOptions = gcOptions;
+        this.fileStore = checkNotNull(fileStore);
+        this.gcOptions = checkNotNull(gcOptions);
     }
 
     @Override
@@ -94,8 +106,13 @@ public class SegmentRevisionGCMBean
     }
 
     @Override
-    public void stopCompaction() {
-        gcOptions.setStopCompaction(true);
+    public void startRevisionGC() {
+        fileStore.getGCRunner().run();
+    }
+
+    @Override
+    public void cancelRevisionGC() {
+        fileStore.cancelGC();
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Mon Oct 10 15:10:06 2016
@@ -63,7 +63,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -169,6 +168,9 @@ public class FileStore implements Segmen
 
     private final boolean memoryMapping;
 
+    @Nonnull
+    private final GarbageCollector garbageCollector;
+
     private volatile List<TarReader> readers;
 
     private volatile TarWriter tarWriter;
@@ -185,15 +187,6 @@ public class FileStore implements Segmen
      */
     private final Scheduler fileStoreScheduler = new Scheduler("FileStore background tasks");
 
-    private final SegmentGCOptions gcOptions;
-
-    private final GCJournal gcJournal;
-
-    /**
-     * Semaphore guarding overlapping calls to {@link #compact()} and {@link #cleanup()}
-     */
-    private final Semaphore gcSemaphore = new Semaphore(1);
-
     /**
      * List of old tar file generations that are waiting to be removed. They can
      * not be removed immediately, because they first need to be closed, and the
@@ -202,11 +195,6 @@ public class FileStore implements Segmen
     private final FileReaper fileReaper = new FileReaper();
 
     /**
-     * {@code GcListener} listening to this instance's gc progress
-     */
-    private final GCListener gcListener;
-
-    /**
      * This flag is periodically updated by calling the {@code SegmentGCOptions}
      * at regular intervals.
      */
@@ -235,7 +223,7 @@ public class FileStore implements Segmen
     };
 
     // FIXME OAK-4450: Properly split the FileStore into read-only and r/w variants
-    FileStore(FileStoreBuilder builder, final boolean readOnly) throws InvalidFileStoreVersionException, IOException {
+    FileStore(final FileStoreBuilder builder, boolean readOnly) throws InvalidFileStoreVersionException, IOException {
         this.directory = builder.getDirectory();
         if (!readOnly) {
             lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw");
@@ -261,7 +249,6 @@ public class FileStore implements Segmen
         }, blobStore, builder.getStringCacheSize(), builder.getTemplateCacheSize());
 
         this.binaryReferenceConsumer = new BinaryReferenceConsumer() {
-
             @Override
             public void consume(int generation, UUID segmentId, String binaryReference) {
                 fileStoreLock.writeLock().lock();
@@ -271,7 +258,6 @@ public class FileStore implements Segmen
                     fileStoreLock.writeLock().unlock();
                 }
             }
-
         };
 
         this.segmentWriter = segmentWriterBuilder("sys")
@@ -286,9 +272,7 @@ public class FileStore implements Segmen
                 .build(this);
         this.maxFileSize = builder.getMaxFileSize() * MB;
         this.memoryMapping = builder.getMemoryMapping();
-        this.gcListener = builder.getGcListener();
-        this.gcOptions = builder.getGcOptions();
-        this.gcJournal = new GCJournal(directory);
+        this.garbageCollector = new GarbageCollector(builder.getGcOptions(), builder.getGcListener(), new GCJournal(directory));
 
         Map<Integer, Map<Character, File>> map = collectFiles(directory);
 
@@ -350,9 +334,10 @@ public class FileStore implements Segmen
                     });
             fileStoreScheduler.scheduleAtFixedRate(
                     format("TarMK disk space check [%s]", directory), 1, MINUTES, new Runnable() {
+                        SegmentGCOptions gcOptions = builder.getGcOptions();
                 @Override
                 public void run() {
-                    checkDiskSpace();
+                    checkDiskSpace(gcOptions);
                 }
             });
         }
@@ -481,95 +466,12 @@ public class FileStore implements Segmen
                 try {
                     gc();
                 } catch (IOException e) {
-                    log.error("Error running compaction", e);
+                    log.error("Error running revision garbage collection", e);
                 }
             }
         });
     }
 
-    /**
-     * Run garbage collection: estimation, compaction, cleanup
-     * @throws IOException
-     */
-    public void gc() throws IOException {
-        gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
-        Stopwatch watch = Stopwatch.createStarted();
-
-        int gainThreshold = gcOptions.getGainThreshold();
-        boolean sufficientEstimatedGain = true;
-        if (gainThreshold <= 0) {
-            gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
-                    GC_COUNT, gainThreshold);
-        } else if (gcOptions.isPaused()) {
-            gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
-        } else {
-            gcListener.info("TarMK GC #{}: estimation started", GC_COUNT);
-            Supplier<Boolean> cancel = newCancelCompactionCondition();
-            GCEstimation estimate = estimateCompactionGain(cancel);
-            if (cancel.get()) {
-                gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel);
-            }
-
-            sufficientEstimatedGain = estimate.gcNeeded();
-            String gcLog = estimate.gcLog();
-            if (sufficientEstimatedGain) {
-                gcListener.info(
-                        "TarMK GC #{}: estimation completed in {} ({} ms). {}",
-                        GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
-            } else {
-                gcListener.skipped(
-                        "TarMK GC #{}: estimation completed in {} ({} ms). {}",
-                        GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
-            }
-        }
-
-        if (sufficientEstimatedGain) {
-            if (!gcOptions.isPaused()) {
-                logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
-                log(segmentWriter.getNodeCacheOccupancyInfo());
-                Runnable cleanupTask = compact();
-                if (cleanupTask != null) {
-                    cleanupTask.run();
-                }
-                logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
-                log(segmentWriter.getNodeCacheOccupancyInfo());
-            } else {
-                gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
-            }
-        }
-    }
-
-    private static void logAndClear(
-            @Nonnull DescriptiveStatistics nodeWriteTimeStats,
-            @Nonnull DescriptiveStatistics nodeCompactTimeStats) {
-        log.info("Node write time statistics (ns) {}", toString(nodeWriteTimeStats));
-        log.info("Node compact time statistics (ns) {}", toString(nodeCompactTimeStats));
-        nodeWriteTimeStats.clear();
-        nodeCompactTimeStats.clear();
-    }
-
-    private static void log(@CheckForNull String nodeCacheOccupancyInfo) {
-        if (nodeCacheOccupancyInfo != null) {
-            log.info("NodeCache occupancy: {}", nodeCacheOccupancyInfo);
-        }
-    }
-
-    private static String toString(DescriptiveStatistics statistics) {
-        DecimalFormat sci = new DecimalFormat("##0.0E0");
-        DecimalFormatSymbols symbols = sci.getDecimalFormatSymbols();
-        symbols.setNaN("NaN");
-        symbols.setInfinity("Inf");
-        sci.setDecimalFormatSymbols(symbols);
-        return "min=" + sci.format(statistics.getMin()) +
-                ", 10%=" + sci.format(statistics.getPercentile(10.0)) +
-                ", 50%=" + sci.format(statistics.getPercentile(50.0)) +
-                ", 90%=" + sci.format(statistics.getPercentile(90.0)) +
-                ", max=" + sci.format(statistics.getMax()) +
-                ", mean=" + sci.format(statistics.getMean()) +
-                ", stdev=" + sci.format(statistics.getStandardDeviation()) +
-                ", N=" + sci.format(statistics.getN());
-    }
-
     static Map<Integer, Map<Character, File>> collectFiles(File directory) {
         Map<Integer, Map<Character, File>> dataFiles = newHashMap();
         Map<Integer, File> bulkFiles = newHashMap();
@@ -695,35 +597,6 @@ public class FileStore implements Segmen
         }
     }
 
-    /**
-     * Estimated compaction gain. The result will be undefined if stopped through
-     * the passed {@code stop} signal.
-     * @param stop  signal for stopping the estimation process.
-     * @return compaction gain estimate
-     */
-    GCEstimation estimateCompactionGain(Supplier<Boolean> stop) {
-        if (gcOptions.isGcSizeDeltaEstimation()) {
-            SizeDeltaGcEstimation e = new SizeDeltaGcEstimation(gcOptions,
-                    gcJournal, stats.getApproximateSize());
-            return e;
-        }
-
-        CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(),
-                count(), stop, gcOptions.getGainThreshold());
-        fileStoreLock.readLock().lock();
-        try {
-            for (TarReader reader : readers) {
-                reader.accept(estimate);
-                if (stop.get()) {
-                    break;
-                }
-            }
-        } finally {
-            fileStoreLock.readLock().unlock();
-        }
-        return estimate;
-    }
-
     public FileStoreStats getStats() {
         return stats;
     }
@@ -735,209 +608,47 @@ public class FileStore implements Segmen
                 segmentWriter.flush();
                 tarWriter.flush();
                 stats.flushed();
-                
+
                 return null;
             }
         });
     }
 
     /**
-     * Try to acquire the passed {@code semaphore}
-     * @param semaphore
-     * @return  a closable for releasing the {@code semaphore}
-     * @throws  IllegalStateException if acquiring the {@code semaphore} failed.
-     */
-    private static Closeable withSemaphore(@Nonnull final Semaphore semaphore) {
-        if (!semaphore.tryAcquire()) {
-            throw new IllegalStateException("Compaction or cleanup already in progress");
-        }
-        return new Closeable() {
-            @Override
-            public void close() {
-                semaphore.release();
-            }
-        };
-    }
-
-    /**
-     * Run garbage collection on the segment level: reclaim those data segments
-     * that are from an old segment generation and those bulk segments that are not
-     * reachable anymore.
-     * Those tar files that shrink by at least 25% are rewritten to a new tar generation
-     * skipping the reclaimed segments.
+     * Run garbage collection: estimation, compaction, cleanup
+     * @throws IOException
      */
-    public void cleanup() throws IOException {
-        fileReaper.add(cleanupOldGenerations(getGcGeneration()));
+    public void gc() throws IOException {
+        garbageCollector.run();
     }
 
     /**
-     * Cleanup segments that are from an old generation. That segments whose generation
-     * is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older.
-     * @param gcGeneration
-     * @return list of files to be removed
-     * @throws IOException
+     * Run the compaction gain estimation process.
+     * @return
      */
-    private List<File> cleanupOldGenerations(int gcGeneration) throws IOException {
-        final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations();
-
-        Predicate<Integer> reclaimPredicate = new Predicate<Integer>() {
-            @Override
-            public boolean apply(Integer generation) {
-                return generation <= reclaimGeneration;
-            }
-        };
-        return cleanup(reclaimPredicate,
-            "gc-count=" + GC_COUNT +
-            ",gc-status=success" +
-            ",store-generation=" + gcGeneration +
-            ",reclaim-predicate=(generation<=" + reclaimGeneration + ")");
+    public GCEstimation estimateCompactionGain() {
+        return garbageCollector.estimateCompactionGain(Suppliers.ofInstance(false));
     }
 
     /**
-     * Cleanup segments of the given generation {@code gcGeneration}.
-     * @param gcGeneration
-     * @return list of files to be removed
-     * @throws IOException
+     * Copy every referenced record in data (non-bulk) segments. Bulk segments
+     * are fully kept (they are only removed in cleanup, if there is no
+     * reference to them).
+     * @return {@code true} on success, {@code false} otherwise.
      */
-    private List<File> cleanupGeneration(final int gcGeneration) throws IOException {
-        Predicate<Integer> cleanupPredicate = new Predicate<Integer>() {
-            @Override
-            public boolean apply(Integer generation) {
-                return generation == gcGeneration;
-            }
-        };
-        return cleanup(cleanupPredicate,
-            "gc-count=" + GC_COUNT +
-            ",gc-status=failed" +
-            ",store-generation=" + (gcGeneration - 1) +
-            ",reclaim-predicate=(generation==" + gcGeneration + ")");
+    public boolean compact() throws IOException {
+        return garbageCollector.compact() > 0;
     }
 
     /**
-     * Cleanup segments whose generation matches the {@code reclaimGeneration} predicate.
-     * @param reclaimGeneration
-     * @param gcInfo  gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)}
-     * @return list of files to be removed
-     * @throws IOException
+     * Run garbage collection on the segment level: reclaim those data segments
+     * that are from an old segment generation and those bulk segments that are not
+     * reachable anymore.
+     * Those tar files that shrink by at least 25% are rewritten to a new tar generation
+     * skipping the reclaimed segments.
      */
-    private List<File> cleanup(
-            @Nonnull Predicate<Integer> reclaimGeneration,
-            @Nonnull String gcInfo)
-    throws IOException {
-        try (Closeable s = withSemaphore(gcSemaphore)) {
-            Stopwatch watch = Stopwatch.createStarted();
-            Set<UUID> bulkRefs = newHashSet();
-            Map<TarReader, TarReader> cleaned = newLinkedHashMap();
-
-            long initialSize = 0;
-            fileStoreLock.writeLock().lock();
-            try {
-                gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
-
-                newWriter();
-                segmentCache.clear();
-
-                // Suggest to the JVM that now would be a good time
-                // to clear stale weak references in the SegmentTracker
-                System.gc();
-
-                collectBulkReferences(bulkRefs);
-
-                for (TarReader reader : readers) {
-                    cleaned.put(reader, reader);
-                    initialSize += reader.size();
-                }
-            } finally {
-                fileStoreLock.writeLock().unlock();
-            }
-
-            gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)",
-                    GC_COUNT, humanReadableByteCount(initialSize), initialSize);
-
-            Set<UUID> reclaim = newHashSet();
-            for (TarReader reader : cleaned.keySet()) {
-                reader.mark(bulkRefs, reclaim, reclaimGeneration);
-                log.info("{}: size of bulk references/reclaim set {}/{}",
-                        reader, bulkRefs.size(), reclaim.size());
-                if (shutdown) {
-                    gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
-                    break;
-                }
-            }
-            Set<UUID> reclaimed = newHashSet();
-            for (TarReader reader : cleaned.keySet()) {
-                cleaned.put(reader, reader.sweep(reclaim, reclaimed));
-                if (shutdown) {
-                    gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
-                    break;
-                }
-            }
-
-            // it doesn't account for concurrent commits that might have happened
-            long afterCleanupSize = 0;
-
-            List<TarReader> oldReaders = newArrayList();
-            fileStoreLock.writeLock().lock();
-            try {
-                // Replace current list of reader with the cleaned readers taking care not to lose
-                // any new reader that might have come in through concurrent calls to newWriter()
-                List<TarReader> sweptReaders = newArrayList();
-                for (TarReader reader : readers) {
-                    if (cleaned.containsKey(reader)) {
-                        TarReader newReader = cleaned.get(reader);
-                        if (newReader != null) {
-                            sweptReaders.add(newReader);
-                            afterCleanupSize += newReader.size();
-                        }
-                        // if these two differ, the former represents the swept version of the latter
-                        if (newReader != reader) {
-                            oldReaders.add(reader);
-                        }
-                    } else {
-                        sweptReaders.add(reader);
-                    }
-                }
-                readers = sweptReaders;
-            } finally {
-                fileStoreLock.writeLock().unlock();
-            }
-            tracker.clearSegmentIdTables(reclaimed, gcInfo);
-
-            // Close old readers *after* setting readers to the new readers to avoid accessing
-            // a closed reader from readSegment()
-            LinkedList<File> toRemove = newLinkedList();
-            for (TarReader oldReader : oldReaders) {
-                closeAndLogOnFail(oldReader);
-                File file = oldReader.getFile();
-                gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
-                toRemove.addLast(file);
-            }
-
-            long finalSize = size();
-            long reclaimedSize = initialSize - afterCleanupSize;
-            stats.reclaimed(reclaimedSize);
-            gcJournal.persist(reclaimedSize, finalSize);
-            gcListener.cleaned(reclaimedSize, finalSize);
-            gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
-                            " and space reclaimed {} ({} bytes).",
-                    GC_COUNT, watch, watch.elapsed(MILLISECONDS),
-                    humanReadableByteCount(finalSize), finalSize,
-                    humanReadableByteCount(reclaimedSize), reclaimedSize);
-            return toRemove;
-        }
-    }
-
-    private void collectBulkReferences(Set<UUID> bulkRefs) {
-        Set<UUID> refs = newHashSet();
-        for (SegmentId id : tracker.getReferencedSegmentIds()) {
-            refs.add(id.asUUID());
-        }
-        tarWriter.collectReferences(refs);
-        for (UUID ref : refs) {
-            if (!isDataSegmentId(ref.getLeastSignificantBits())) {
-                bulkRefs.add(ref);
-            }
-        }
+    public void cleanup() throws IOException {
+        garbageCollector.cleanup();
     }
 
     /**
@@ -953,209 +664,15 @@ public class FileStore implements Segmen
      * @param collector  reference collector called back for each blob reference found
      */
     public void collectBlobReferences(ReferenceCollector collector) throws IOException {
-        segmentWriter.flush();
-        List<TarReader> tarReaders = newArrayList();
-        fileStoreLock.writeLock().lock();
-        try {
-            newWriter();
-            tarReaders.addAll(this.readers);
-        } finally {
-            fileStoreLock.writeLock().unlock();
-        }
-
-        int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1;
-        for (TarReader tarReader : tarReaders) {
-            tarReader.collectBlobReferences(collector, newReferenceReader(this), minGeneration);
-        }
-    }
-
-    /**
-     * Returns the cancellation policy for the compaction phase.
-     * @return a supplier indicating if compaction should be canceled.
-     */
-    private Supplier<Boolean> newCancelCompactionCondition() {
-        return new CancelCompactionSupplier(this);
+        garbageCollector.collectBlobReferences(collector);
     }
 
     /**
-     * @param duration
-     * @param unit
-     * @return  {@code Supplier} instance which returns true once the time specified in
-     * {@code duration} and {@code unit} has passed.
+     * Cancel a running revision garbage collection compaction process as soon as possible.
+     * Does nothing if gc is not running.
      */
-    private static Supplier<Boolean> timeOut(final long duration, @Nonnull final TimeUnit unit) {
-        return new Supplier<Boolean>() {
-            long deadline = currentTimeMillis() + MILLISECONDS.convert(duration, unit);
-            @Override
-            public Boolean get() {
-                return currentTimeMillis() > deadline;
-            }
-        };
-    }
-
-    /**
-     * @param supplier1
-     * @param supplier2
-     * @return {@code Supplier} instance that returns {@code true} iff {@code supplier1} returns
-     * {@code true} or otherwise {@code supplier2} returns {@code true}.
-     */
-    private static Supplier<Boolean> or(
-            @Nonnull Supplier<Boolean> supplier1,
-            @Nonnull Supplier<Boolean> supplier2) {
-        if (supplier1.get()) {
-            return Suppliers.ofInstance(true);
-        } else {
-            return supplier2;
-        }
-    }
-
-    /**
-     * Copy every referenced record in data (non-bulk) segments. Bulk segments
-     * are fully kept (they are only removed in cleanup, if there is no
-     * reference to them).
-     * @return A {@code Runnable} for cleaning up or {@code null} when compact failed.
-     */
-    @CheckForNull
-    public Runnable compact() throws IOException {
-        try (Closeable s = withSemaphore(gcSemaphore)) {
-            Stopwatch watch = Stopwatch.createStarted();
-            gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
-
-            SegmentNodeState before = getHead();
-            final int newGeneration = getGcGeneration() + 1;
-            SegmentBufferWriter bufferWriter = new SegmentBufferWriter(this, tracker, segmentReader, "c", newGeneration);
-            Supplier<Boolean> cancel = newCancelCompactionCondition();
-            SegmentNodeState after = compact(bufferWriter, before, cancel);
-            if (after == null) {
-                gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
-                return null;
-            }
-
-            gcListener.info("TarMK GC #{}: compacted {} to {}",
-                    GC_COUNT, before.getRecordId(), after.getRecordId());
-
-            int cycles = 0;
-            boolean success = false;
-            while (cycles < gcOptions.getRetryCount() &&
-                    !(success = revisions.setHead(before.getRecordId(), after.getRecordId(), EXPEDITE_OPTION))) {
-                // Some other concurrent changes have been made.
-                // Rebase (and compact) those changes on top of the
-                // compacted state before retrying to set the head.
-                cycles++;
-                gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
-                    "Compacting these commits. Cycle {} of {}",
-                    GC_COUNT, cycles, gcOptions.getRetryCount());
-                SegmentNodeState head = getHead();
-                after = compact(bufferWriter, head, cancel);
-                if (after == null) {
-                    gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
-                    return null;
-                }
-
-                gcListener.info("TarMK GC #{}: compacted {} against {} to {}",
-                        GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
-                before = head;
-            }
-
-            if (!success) {
-                gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
-                        GC_COUNT, cycles);
-                int forceTimeout = gcOptions.getForceTimeout();
-                if (forceTimeout > 0) {
-                    gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds",
-                        GC_COUNT, forceTimeout);
-                    cycles++;
-                    success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS)));
-                    if (!success) {
-                        if (cancel.get()) {
-                            gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
-                                    "Compaction was cancelled: {}.", GC_COUNT, cancel);
-                        } else {
-                            gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
-                                    "Most likely compaction didn't get exclusive access to the store.", GC_COUNT);
-                        }
-                    }
-                }
-            }
-
-            if (success) {
-                gcListener.compacted(SUCCESS, newGeneration);
-                gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
-                        GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
-                return new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            fileReaper.add(cleanupOldGenerations(newGeneration));
-                        } catch (IOException e) {
-                            gcListener.error("TarMK GC #" + GC_COUNT + ": cleanup failed", e);
-                        }
-                    }
-                };
-            } else {
-                gcListener.compacted(FAILURE, newGeneration);
-                gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
-                        GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
-                return new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT);
-                            fileReaper.add(cleanupGeneration(newGeneration));
-                        } catch (IOException e) {
-                            gcListener.error("TarMK GC #" + GC_COUNT + ": cleanup failed", e);
-                        }
-                    }
-                };
-            }
-        } catch (InterruptedException e) {
-            gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
-            currentThread().interrupt();
-            return null;
-        } catch (Exception e) {
-            gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
-            return null;
-        }
-    }
-
-    private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head,
-                                     Supplier<Boolean> cancel)
-    throws IOException {
-        if (gcOptions.isOffline()) {
-            SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer);
-            return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions)
-                    .compact(EMPTY_NODE, head, EMPTY_NODE);
-        } else {
-            return segmentWriter.writeNode(head, bufferWriter, cancel);
-        }
-    }
-
-    private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
-                                 @Nonnull final Supplier<Boolean> cancel)
-    throws InterruptedException {
-        return revisions.
-            setHead(new Function<RecordId, RecordId>() {
-                @Nullable
-                @Override
-                public RecordId apply(RecordId base) {
-                    try {
-                        long t0 = currentTimeMillis();
-                        SegmentNodeState after = compact(bufferWriter,
-                                segmentReader.readNode(base), cancel);
-                        if (after == null) {
-                            gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds",
-                                GC_COUNT, (currentTimeMillis() - t0) / 1000);
-                            return null;
-                        } else {
-                            return after.getRecordId();
-                        }
-                    } catch (IOException e) {
-                        gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
-                        return null;
-                    }
-                }
-            },
-            timeout(gcOptions.getForceTimeout(), SECONDS));
+    public void cancelGC() {
+        garbageCollector.cancel();
     }
 
     public Iterable<SegmentId> getSegmentIds() {
@@ -1504,7 +1021,7 @@ public class FileStore implements Segmen
         return emptyMap();
     }
 
-    private void checkDiskSpace() {
+    private void checkDiskSpace(SegmentGCOptions gcOptions) {
         long repositoryDiskSpace = size();
         long availableDiskSpace = directory.getFreeSpace();
         boolean updated = gcOptions.isDiskSpaceSufficient(repositoryDiskSpace, availableDiskSpace);
@@ -1619,7 +1136,7 @@ public class FileStore implements Segmen
         }
 
         @Override
-        public Runnable compact() {
+        public boolean compact() {
             throw new UnsupportedOperationException("Read Only Store");
         }
 
@@ -1640,61 +1157,541 @@ public class FileStore implements Segmen
         }
     }
 
+    private class GarbageCollector {
+        @Nonnull
+        private final SegmentGCOptions gcOptions;
 
-    /**
-     * Represents the cancellation policy for the compaction phase. If the disk
-     * space was considered insufficient at least once during compaction (or if
-     * the space was never sufficient to begin with), compaction is considered
-     * canceled. Furthermore when the file store is shutting down, compaction is
-     * considered canceled.
-     */
-    private static class CancelCompactionSupplier implements Supplier<Boolean> {
+        /**
+         * {@code GcListener} listening to this instance's gc progress
+         */
+        @Nonnull
+        private final GCListener gcListener;
 
-        private static enum REASON {
-            UNKNOWN, DISK_SPACE, SHUTDOWN, MANUAL
-        };
+        @Nonnull
+        private final GCJournal gcJournal;
+
+        private volatile boolean cancelled;
+
+        GarbageCollector(@Nonnull SegmentGCOptions gcOptions,
+                         @Nonnull GCListener gcListener,
+                         @Nonnull GCJournal gcJournal) {
+            this.gcOptions = gcOptions;
+            this.gcListener = gcListener;
+            this.gcJournal = gcJournal;
+        }
+
+        synchronized void run() throws IOException {
+            gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
+            Stopwatch watch = Stopwatch.createStarted();
 
-        private REASON reason = REASON.UNKNOWN;
+            int gainThreshold = gcOptions.getGainThreshold();
+            boolean sufficientEstimatedGain = true;
+            if (gainThreshold <= 0) {
+                gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
+                        GC_COUNT, gainThreshold);
+            } else if (gcOptions.isPaused()) {
+                gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
+            } else {
+                gcListener.info("TarMK GC #{}: estimation started", GC_COUNT);
+                Supplier<Boolean> cancel = new CancelCompactionSupplier(FileStore.this);
+                GCEstimation estimate = estimateCompactionGain(cancel);
+                if (cancel.get()) {
+                    gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel);
+                    return;
+                }
 
-        private final FileStore store;
+                sufficientEstimatedGain = estimate.gcNeeded();
+                String gcLog = estimate.gcLog();
+                if (sufficientEstimatedGain) {
+                    gcListener.info(
+                            "TarMK GC #{}: estimation completed in {} ({} ms). {}",
+                            GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
+                } else {
+                    gcListener.skipped(
+                            "TarMK GC #{}: estimation completed in {} ({} ms). {}",
+                            GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
+                }
+            }
 
-        public CancelCompactionSupplier(FileStore store) {
-            this.store = store;
-            this.store.gcOptions.setStopCompaction(false);
+            if (sufficientEstimatedGain) {
+                if (!gcOptions.isPaused()) {
+                    logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
+                    log(segmentWriter.getNodeCacheOccupancyInfo());
+                    int gen = compact();
+                    if (gen > 0) {
+                        fileReaper.add(cleanupOldGenerations(gen));
+                    } else if (gen < 0) {
+                        gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT);
+                        fileReaper.add(cleanupGeneration(-gen));
+                    }
+                    logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
+                    log(segmentWriter.getNodeCacheOccupancyInfo());
+                } else {
+                    gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
+                }
+            }
         }
 
-        @Override
-        public Boolean get() {
-            // The outOfDiskSpace and shutdown flags can only transition from
-            // false (their initial
-            // values), to true. Once true, there should be no way to go back.
-            if (!store.sufficientDiskSpace.get()) {
-                reason = REASON.DISK_SPACE;
-                return true;
+        /**
+         * Estimated compaction gain. The result will be undefined if stopped through
+         * the passed {@code stop} signal.
+         * @param stop  signal for stopping the estimation process.
+         * @return compaction gain estimate
+         */
+        synchronized GCEstimation estimateCompactionGain(Supplier<Boolean> stop) {
+            if (gcOptions.isGcSizeDeltaEstimation()) {
+                SizeDeltaGcEstimation e = new SizeDeltaGcEstimation(gcOptions,
+                        gcJournal, stats.getApproximateSize());
+                return e;
             }
-            if (store.shutdown) {
-                reason = REASON.SHUTDOWN;
-                return true;
+
+            CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(),
+                    count(), stop, gcOptions.getGainThreshold());
+            fileStoreLock.readLock().lock();
+            try {
+                for (TarReader reader : readers) {
+                    reader.accept(estimate);
+                    if (stop.get()) {
+                        break;
+                    }
+                }
+            } finally {
+                fileStoreLock.readLock().unlock();
             }
-            if (store.gcOptions.isStopCompaction()) {
-                reason = REASON.MANUAL;
-                return true;
+            return estimate;
+        }
+
+        private void logAndClear(
+                @Nonnull DescriptiveStatistics nodeWriteTimeStats,
+                @Nonnull DescriptiveStatistics nodeCompactTimeStats) {
+            log.info("Node write time statistics (ns) {}", toString(nodeWriteTimeStats));
+            log.info("Node compact time statistics (ns) {}", toString(nodeCompactTimeStats));
+            nodeWriteTimeStats.clear();
+            nodeCompactTimeStats.clear();
+        }
+
+        private void log(@CheckForNull String nodeCacheOccupancyInfo) {
+            if (nodeCacheOccupancyInfo != null) {
+                log.info("NodeCache occupancy: {}", nodeCacheOccupancyInfo);
             }
-            return false;
         }
 
-        @Override
-        public String toString() {
-            switch (reason) {
-            case DISK_SPACE:
-                return "Not enough disk space available";
-            case SHUTDOWN:
-                return "FileStore shutdown request received";
-            case MANUAL:
-                return "GC stop request received";
-            default:
-                return "";
+        private String toString(DescriptiveStatistics statistics) {
+            DecimalFormat sci = new DecimalFormat("##0.0E0");
+            DecimalFormatSymbols symbols = sci.getDecimalFormatSymbols();
+            symbols.setNaN("NaN");
+            symbols.setInfinity("Inf");
+            sci.setDecimalFormatSymbols(symbols);
+            return "min=" + sci.format(statistics.getMin()) +
+                    ", 10%=" + sci.format(statistics.getPercentile(10.0)) +
+                    ", 50%=" + sci.format(statistics.getPercentile(50.0)) +
+                    ", 90%=" + sci.format(statistics.getPercentile(90.0)) +
+                    ", max=" + sci.format(statistics.getMax()) +
+                    ", mean=" + sci.format(statistics.getMean()) +
+                    ", stdev=" + sci.format(statistics.getStandardDeviation()) +
+                    ", N=" + sci.format(statistics.getN());
+        }
+
+        synchronized int compact() throws IOException {
+            try {
+                Stopwatch watch = Stopwatch.createStarted();
+                gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
+
+                SegmentNodeState before = getHead();
+                final int newGeneration = getGcGeneration() + 1;
+                SegmentBufferWriter bufferWriter = new SegmentBufferWriter(FileStore.this, tracker, segmentReader, "c", newGeneration);
+                Supplier<Boolean> cancel = new CancelCompactionSupplier(FileStore.this);
+                SegmentNodeState after = compact(bufferWriter, before, cancel);
+                if (after == null) {
+                    gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
+                    return 0;
+                }
+
+                gcListener.info("TarMK GC #{}: compacted {} to {}",
+                        GC_COUNT, before.getRecordId(), after.getRecordId());
+
+                int cycles = 0;
+                boolean success = false;
+                while (cycles < gcOptions.getRetryCount() &&
+                        !(success = revisions.setHead(before.getRecordId(), after.getRecordId(), EXPEDITE_OPTION))) {
+                    // Some other concurrent changes have been made.
+                    // Rebase (and compact) those changes on top of the
+                    // compacted state before retrying to set the head.
+                    cycles++;
+                    gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
+                                    "Compacting these commits. Cycle {} of {}",
+                            GC_COUNT, cycles, gcOptions.getRetryCount());
+                    SegmentNodeState head = getHead();
+                    after = compact(bufferWriter, head, cancel);
+                    if (after == null) {
+                        gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
+                        return 0;
+                    }
+
+                    gcListener.info("TarMK GC #{}: compacted {} against {} to {}",
+                            GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
+                    before = head;
+                }
+
+                if (!success) {
+                    gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
+                            GC_COUNT, cycles);
+                    int forceTimeout = gcOptions.getForceTimeout();
+                    if (forceTimeout > 0) {
+                        gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds",
+                                GC_COUNT, forceTimeout);
+                        cycles++;
+                        success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS)));
+                        if (!success) {
+                            if (cancel.get()) {
+                                gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
+                                        "Compaction was cancelled: {}.", GC_COUNT, cancel);
+                            } else {
+                                gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
+                                        "Most likely compaction didn't get exclusive access to the store.", GC_COUNT);
+                            }
+                        }
+                    }
+                }
+
+                if (success) {
+                    gcListener.compacted(SUCCESS, newGeneration);
+                    gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
+                            GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
+                    return newGeneration;
+                } else {
+                    gcListener.compacted(FAILURE, newGeneration);
+                    gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
+                            GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
+                    return -newGeneration;
+                }
+            } catch (InterruptedException e) {
+                gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
+                currentThread().interrupt();
+                return 0;
+            } catch (Exception e) {
+                gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
+                return 0;
+            }
+        }
+
+        /**
+         * @param duration
+         * @param unit
+         * @return  {@code Supplier} instance which returns true once the time specified in
+         * {@code duration} and {@code unit} has passed.
+         */
+        private Supplier<Boolean> timeOut(final long duration, @Nonnull final TimeUnit unit) {
+            return new Supplier<Boolean>() {
+                long deadline = currentTimeMillis() + MILLISECONDS.convert(duration, unit);
+                @Override
+                public Boolean get() {
+                    return currentTimeMillis() > deadline;
+                }
+            };
+        }
+
+        /**
+         * @param supplier1
+         * @param supplier2
+         * @return {@code Supplier} instance that returns {@code true} iff {@code supplier1} returns
+         * {@code true} or otherwise {@code supplier2} returns {@code true}.
+         */
+        private Supplier<Boolean> or(
+                @Nonnull Supplier<Boolean> supplier1,
+                @Nonnull Supplier<Boolean> supplier2) {
+            if (supplier1.get()) {
+                return Suppliers.ofInstance(true);
+            } else {
+                return supplier2;
+            }
+        }
+
+        private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head,
+                                         Supplier<Boolean> cancel)
+        throws IOException {
+            if (gcOptions.isOffline()) {
+                SegmentWriter writer = new SegmentWriter(FileStore.this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer);
+                return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions)
+                        .compact(EMPTY_NODE, head, EMPTY_NODE);
+            } else {
+                return segmentWriter.writeNode(head, bufferWriter, cancel);
+            }
+        }
+
+        private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
+                                     @Nonnull final Supplier<Boolean> cancel)
+        throws InterruptedException {
+            return revisions.
+                    setHead(new Function<RecordId, RecordId>() {
+                                @Nullable
+                                @Override
+                                public RecordId apply(RecordId base) {
+                                    try {
+                                        long t0 = currentTimeMillis();
+                                        SegmentNodeState after = compact(bufferWriter,
+                                                segmentReader.readNode(base), cancel);
+                                        if (after == null) {
+                                            gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds",
+                                                    GC_COUNT, (currentTimeMillis() - t0) / 1000);
+                                            return null;
+                                        } else {
+                                            return after.getRecordId();
+                                        }
+                                    } catch (IOException e) {
+                                        gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
+                                        return null;
+                                    }
+                                }
+                            },
+                            timeout(gcOptions.getForceTimeout(), SECONDS));
+        }
+
+        synchronized void cleanup() throws IOException {
+            fileReaper.add(cleanupOldGenerations(getGcGeneration()));
+        }
+
+        /**
+         * Cleanup segments that are from an old generation. That segments whose generation
+         * is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older.
+         * @param gcGeneration
+         * @return list of files to be removed
+         * @throws IOException
+         */
+        private List<File> cleanupOldGenerations(int gcGeneration) throws IOException {
+            final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations();
+
+            Predicate<Integer> reclaimPredicate = new Predicate<Integer>() {
+                @Override
+                public boolean apply(Integer generation) {
+                    return generation <= reclaimGeneration;
+                }
+            };
+            return cleanup(reclaimPredicate,
+                    "gc-count=" + GC_COUNT +
+                            ",gc-status=success" +
+                            ",store-generation=" + gcGeneration +
+                            ",reclaim-predicate=(generation<=" + reclaimGeneration + ")");
+        }
+
+        /**
+         * Cleanup segments whose generation matches the {@code reclaimGeneration} predicate.
+         * @param reclaimGeneration
+         * @param gcInfo  gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)}
+         * @return list of files to be removed
+         * @throws IOException
+         */
+        private List<File> cleanup(
+                @Nonnull Predicate<Integer> reclaimGeneration,
+                @Nonnull String gcInfo)
+        throws IOException {
+            Stopwatch watch = Stopwatch.createStarted();
+            Set<UUID> bulkRefs = newHashSet();
+            Map<TarReader, TarReader> cleaned = newLinkedHashMap();
+
+            long initialSize = 0;
+            fileStoreLock.writeLock().lock();
+            try {
+                gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
+
+                newWriter();
+                segmentCache.clear();
+
+                // Suggest to the JVM that now would be a good time
+                // to clear stale weak references in the SegmentTracker
+                System.gc();
+
+                collectBulkReferences(bulkRefs);
+
+                for (TarReader reader : readers) {
+                    cleaned.put(reader, reader);
+                    initialSize += reader.size();
+                }
+            } finally {
+                fileStoreLock.writeLock().unlock();
+            }
+
+            gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)",
+                    GC_COUNT, humanReadableByteCount(initialSize), initialSize);
+
+            Set<UUID> reclaim = newHashSet();
+            for (TarReader reader : cleaned.keySet()) {
+                reader.mark(bulkRefs, reclaim, reclaimGeneration);
+                log.info("{}: size of bulk references/reclaim set {}/{}",
+                        reader, bulkRefs.size(), reclaim.size());
+                if (shutdown) {
+                    gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+                    break;
+                }
+            }
+            Set<UUID> reclaimed = newHashSet();
+            for (TarReader reader : cleaned.keySet()) {
+                cleaned.put(reader, reader.sweep(reclaim, reclaimed));
+                if (shutdown) {
+                    gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+                    break;
+                }
+            }
+
+            // it doesn't account for concurrent commits that might have happened
+            long afterCleanupSize = 0;
+
+            List<TarReader> oldReaders = newArrayList();
+            fileStoreLock.writeLock().lock();
+            try {
+                // Replace current list of reader with the cleaned readers taking care not to lose
+                // any new reader that might have come in through concurrent calls to newWriter()
+                List<TarReader> sweptReaders = newArrayList();
+                for (TarReader reader : readers) {
+                    if (cleaned.containsKey(reader)) {
+                        TarReader newReader = cleaned.get(reader);
+                        if (newReader != null) {
+                            sweptReaders.add(newReader);
+                            afterCleanupSize += newReader.size();
+                        }
+                        // if these two differ, the former represents the swept version of the latter
+                        if (newReader != reader) {
+                            oldReaders.add(reader);
+                        }
+                    } else {
+                        sweptReaders.add(reader);
+                    }
+                }
+                readers = sweptReaders;
+            } finally {
+                fileStoreLock.writeLock().unlock();
+            }
+            tracker.clearSegmentIdTables(reclaimed, gcInfo);
+
+            // Close old readers *after* setting readers to the new readers to avoid accessing
+            // a closed reader from readSegment()
+            LinkedList<File> toRemove = newLinkedList();
+            for (TarReader oldReader : oldReaders) {
+                closeAndLogOnFail(oldReader);
+                File file = oldReader.getFile();
+                gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
+                toRemove.addLast(file);
+            }
+
+            long finalSize = size();
+            long reclaimedSize = initialSize - afterCleanupSize;
+            stats.reclaimed(reclaimedSize);
+            gcJournal.persist(reclaimedSize, finalSize);
+            gcListener.cleaned(reclaimedSize, finalSize);
+            gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
+                            " and space reclaimed {} ({} bytes).",
+                    GC_COUNT, watch, watch.elapsed(MILLISECONDS),
+                    humanReadableByteCount(finalSize), finalSize,
+                    humanReadableByteCount(reclaimedSize), reclaimedSize);
+            return toRemove;
+        }
+
+        private void collectBulkReferences(Set<UUID> bulkRefs) {
+            Set<UUID> refs = newHashSet();
+            for (SegmentId id : tracker.getReferencedSegmentIds()) {
+                refs.add(id.asUUID());
+            }
+            tarWriter.collectReferences(refs);
+            for (UUID ref : refs) {
+                if (!isDataSegmentId(ref.getLeastSignificantBits())) {
+                    bulkRefs.add(ref);
+                }
             }
         }
+
+        /**
+         * Cleanup segments of the given generation {@code gcGeneration}.
+         * @param gcGeneration
+         * @return list of files to be removed
+         * @throws IOException
+         */
+        private List<File> cleanupGeneration(final int gcGeneration) throws IOException {
+            Predicate<Integer> cleanupPredicate = new Predicate<Integer>() {
+                @Override
+                public boolean apply(Integer generation) {
+                    return generation == gcGeneration;
+                }
+            };
+            return cleanup(cleanupPredicate,
+                    "gc-count=" + GC_COUNT +
+                            ",gc-status=failed" +
+                            ",store-generation=" + (gcGeneration - 1) +
+                            ",reclaim-predicate=(generation==" + gcGeneration + ")");
+        }
+
+        /**
+         * Finds all external blob references that are currently accessible
+         * in this repository and adds them to the given collector. Useful
+         * for collecting garbage in an external data store.
+         * <p>
+         * Note that this method only collects blob references that are already
+         * stored in the repository (at the time when this method is called), so
+         * the garbage collector will need some other mechanism for tracking
+         * in-memory references and references stored while this method is
+         * running.
+         * @param collector  reference collector called back for each blob reference found
+         */
+        synchronized void collectBlobReferences(ReferenceCollector collector) throws IOException {
+            segmentWriter.flush();
+            List<TarReader> tarReaders = newArrayList();
+            fileStoreLock.writeLock().lock();
+            try {
+                newWriter();
+                tarReaders.addAll(FileStore.this.readers);
+            } finally {
+                fileStoreLock.writeLock().unlock();
+            }
+
+            int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1;
+            for (TarReader tarReader : tarReaders) {
+                tarReader.collectBlobReferences(collector, newReferenceReader(FileStore.this), minGeneration);
+            }
+        }
+
+        void cancel() {
+            cancelled = true;
+        }
+
+        /**
+         * Represents the cancellation policy for the compaction phase. If the disk
+         * space was considered insufficient at least once during compaction (or if
+         * the space was never sufficient to begin with), compaction is considered
+         * canceled. Furthermore when the file store is shutting down, compaction is
+         * considered canceled.
+         */
+        private class CancelCompactionSupplier implements Supplier<Boolean> {
+            private final FileStore store;
+
+            private String reason;
+
+            public CancelCompactionSupplier(@Nonnull FileStore store) {
+                cancelled = false;
+                this.store = store;
+            }
+
+            @Override
+            public Boolean get() {
+                // The outOfDiskSpace and shutdown flags can only transition from
+                // false (their initial values), to true. Once true, there should
+                // be no way to go back.
+                if (!store.sufficientDiskSpace.get()) {
+                    reason = "Not enough disk space";
+                    return true;
+                }
+                if (store.shutdown) {
+                    reason = "The FileStore is shutting down";
+                    return true;
+                }
+                if (cancelled) {
+                    reason = "Cancelled by user";
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
+            public String toString() { return reason; }
+        }
     }
+
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Mon Oct 10 15:10:06 2016
@@ -529,7 +529,7 @@ public class CompactionAndCleanupIT {
             public Boolean call() throws IOException {
                 boolean cancelled = false;
                 for (int k = 0; !cancelled && k < 1000; k++) {
-                    cancelled = fileStore.compact() == null;
+                    cancelled = !fileStore.compact();
                 }
                 return cancelled;
             }
@@ -1080,10 +1080,7 @@ public class CompactionAndCleanupIT {
             Callable<Void> concurrentCleanupTask = new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
-                    // Concurrent cleanup calls are not supported by the file store
-                    synchronized (fileStore) {
-                        fileStore.cleanup();
-                    }
+                    fileStore.cleanup();
                     return null;
                 }
             };
@@ -1143,10 +1140,7 @@ public class CompactionAndCleanupIT {
             final Callable<Void> concurrentCleanTask = new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
-                    // Concurrent cleanup calls are not supported by the file store
-                    synchronized (fileStore) {
-                        fileStore.cleanup();
-                    }
+                    fileStore.cleanup();
                     return null;
                 }
             };

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java Mon Oct 10 15:10:06 2016
@@ -236,7 +236,7 @@ public class SegmentCompactionIT {
         List<Registration> registrations = newArrayList();
         registrations.add(registerMBean(segmentCompactionMBean,
                 new ObjectName("IT:TYPE=Segment Compaction")));
-        registrations.add(registerMBean(new SegmentRevisionGCMBean(gcOptions),
+        registrations.add(registerMBean(new SegmentRevisionGCMBean(fileStore, gcOptions),
                 new ObjectName("IT:TYPE=Segment Revision GC")));
         registrations.add(registerMBean(fileStoreGCMonitor,
                 new ObjectName("IT:TYPE=GC Monitor")));

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java Mon Oct 10 15:10:06 2016
@@ -27,7 +27,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Random;
 
-import com.google.common.base.Suppliers;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
@@ -80,13 +79,11 @@ public class CompactionEstimatorTest {
 
         fileStore.flush();
         try {
-            GCEstimation est = fileStore.estimateCompactionGain(Suppliers
-                    .ofInstance(false));
+            GCEstimation est = fileStore.estimateCompactionGain();
             assertTrue(est.gcNeeded());
             if (est instanceof CompactionGainEstimate) {
                 // should be at 66%
-                assertTrue(((CompactionGainEstimate) est)
-                        .estimateCompactionGain() > 60);
+                assertTrue(((CompactionGainEstimate) est).estimateCompactionGain() > 60);
             }
         } finally {
             fileStore.close();