You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/10/10 15:10:06 UTC
svn commit: r1764116 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/
main/java/org/apache/jackrabbit/oak/segment/compaction/
main/java/org/apache/jackrabbit/oak/segment/file/
test/java/org/apache/jackrabbit/...
Author: mduerig
Date: Mon Oct 10 15:10:06 2016
New Revision: 1764116
URL: http://svn.apache.org/viewvc?rev=1764116&view=rev
Log:
OAK-4617: Align SegmentRevisionGC MBean with new generation based GC
Encapsulate gc functionality in a GarbageCollector subclass and expose a method for cancelling gc
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java Mon Oct 10 15:10:06 2016
@@ -397,7 +397,7 @@ public class SegmentNodeStoreService ext
registrations.add(registerMBean(
whiteboard,
SegmentRevisionGC.class,
- new SegmentRevisionGCMBean(gcOptions),
+ new SegmentRevisionGCMBean(store, gcOptions),
SegmentRevisionGC.TYPE,
"Segment node store gc options"
));
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentGCOptions.java Mon Oct 10 15:10:06 2016
@@ -79,8 +79,6 @@ public class SegmentGCOptions {
"oak.segment.compaction.gcSizeDeltaEstimation",
SIZE_DELTA_ESTIMATION_DEFAULT);
- private volatile boolean stopCompaction;
-
public SegmentGCOptions(boolean paused, int gainThreshold, int retryCount, int forceTimeout) {
this.paused = paused;
this.gainThreshold = gainThreshold;
@@ -296,13 +294,4 @@ public class SegmentGCOptions {
return this;
}
- public boolean isStopCompaction() {
- return stopCompaction;
- }
-
- public boolean setStopCompaction(boolean stop) {
- this.stopCompaction = stop;
- return stop;
- }
-
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGC.java Mon Oct 10 15:10:06 2016
@@ -102,7 +102,13 @@ public interface SegmentRevisionGC {
void setGcSizeDeltaEstimation(long gcSizeDeltaEstimation);
/**
- * Raise the flag to signal compaction to stop as soon as possible.
+ * Initiate a revision garbage collection operation
*/
- void stopCompaction();
+ void startRevisionGC();
+
+ /**
+ * Cancel a running revision garbage collection operation. Does nothing
+ * if revision garbage collection is not running.
+ */
+ void cancelRevisionGC();
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/compaction/SegmentRevisionGCMBean.java Mon Oct 10 15:10:06 2016
@@ -19,18 +19,30 @@
package org.apache.jackrabbit.oak.segment.compaction;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
// FIXME OAK-4617: Align SegmentRevisionGC MBean with new generation based GC
public class SegmentRevisionGCMBean
extends AnnotatedStandardMBean
implements SegmentRevisionGC {
+ @Nonnull
+ private final FileStore fileStore;
+
+ @Nonnull
private final SegmentGCOptions gcOptions;
- public SegmentRevisionGCMBean(SegmentGCOptions gcOptions) {
+ public SegmentRevisionGCMBean(@Nonnull FileStore fileStore, @Nonnull SegmentGCOptions gcOptions) {
super(SegmentRevisionGC.class);
- this.gcOptions = gcOptions;
+ this.fileStore = checkNotNull(fileStore);
+ this.gcOptions = checkNotNull(gcOptions);
}
@Override
@@ -94,8 +106,13 @@ public class SegmentRevisionGCMBean
}
@Override
- public void stopCompaction() {
- gcOptions.setStopCompaction(true);
+ public void startRevisionGC() {
+ fileStore.getGCRunner().run();
+ }
+
+ @Override
+ public void cancelRevisionGC() {
+ fileStore.cancelGC();
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Mon Oct 10 15:10:06 2016
@@ -63,7 +63,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -169,6 +168,9 @@ public class FileStore implements Segmen
private final boolean memoryMapping;
+ @Nonnull
+ private final GarbageCollector garbageCollector;
+
private volatile List<TarReader> readers;
private volatile TarWriter tarWriter;
@@ -185,15 +187,6 @@ public class FileStore implements Segmen
*/
private final Scheduler fileStoreScheduler = new Scheduler("FileStore background tasks");
- private final SegmentGCOptions gcOptions;
-
- private final GCJournal gcJournal;
-
- /**
- * Semaphore guarding overlapping calls to {@link #compact()} and {@link #cleanup()}
- */
- private final Semaphore gcSemaphore = new Semaphore(1);
-
/**
* List of old tar file generations that are waiting to be removed. They can
* not be removed immediately, because they first need to be closed, and the
@@ -202,11 +195,6 @@ public class FileStore implements Segmen
private final FileReaper fileReaper = new FileReaper();
/**
- * {@code GcListener} listening to this instance's gc progress
- */
- private final GCListener gcListener;
-
- /**
* This flag is periodically updated by calling the {@code SegmentGCOptions}
* at regular intervals.
*/
@@ -235,7 +223,7 @@ public class FileStore implements Segmen
};
// FIXME OAK-4450: Properly split the FileStore into read-only and r/w variants
- FileStore(FileStoreBuilder builder, final boolean readOnly) throws InvalidFileStoreVersionException, IOException {
+ FileStore(final FileStoreBuilder builder, boolean readOnly) throws InvalidFileStoreVersionException, IOException {
this.directory = builder.getDirectory();
if (!readOnly) {
lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw");
@@ -261,7 +249,6 @@ public class FileStore implements Segmen
}, blobStore, builder.getStringCacheSize(), builder.getTemplateCacheSize());
this.binaryReferenceConsumer = new BinaryReferenceConsumer() {
-
@Override
public void consume(int generation, UUID segmentId, String binaryReference) {
fileStoreLock.writeLock().lock();
@@ -271,7 +258,6 @@ public class FileStore implements Segmen
fileStoreLock.writeLock().unlock();
}
}
-
};
this.segmentWriter = segmentWriterBuilder("sys")
@@ -286,9 +272,7 @@ public class FileStore implements Segmen
.build(this);
this.maxFileSize = builder.getMaxFileSize() * MB;
this.memoryMapping = builder.getMemoryMapping();
- this.gcListener = builder.getGcListener();
- this.gcOptions = builder.getGcOptions();
- this.gcJournal = new GCJournal(directory);
+ this.garbageCollector = new GarbageCollector(builder.getGcOptions(), builder.getGcListener(), new GCJournal(directory));
Map<Integer, Map<Character, File>> map = collectFiles(directory);
@@ -350,9 +334,10 @@ public class FileStore implements Segmen
});
fileStoreScheduler.scheduleAtFixedRate(
format("TarMK disk space check [%s]", directory), 1, MINUTES, new Runnable() {
+ SegmentGCOptions gcOptions = builder.getGcOptions();
@Override
public void run() {
- checkDiskSpace();
+ checkDiskSpace(gcOptions);
}
});
}
@@ -481,95 +466,12 @@ public class FileStore implements Segmen
try {
gc();
} catch (IOException e) {
- log.error("Error running compaction", e);
+ log.error("Error running revision garbage collection", e);
}
}
});
}
- /**
- * Run garbage collection: estimation, compaction, cleanup
- * @throws IOException
- */
- public void gc() throws IOException {
- gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
- Stopwatch watch = Stopwatch.createStarted();
-
- int gainThreshold = gcOptions.getGainThreshold();
- boolean sufficientEstimatedGain = true;
- if (gainThreshold <= 0) {
- gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
- GC_COUNT, gainThreshold);
- } else if (gcOptions.isPaused()) {
- gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
- } else {
- gcListener.info("TarMK GC #{}: estimation started", GC_COUNT);
- Supplier<Boolean> cancel = newCancelCompactionCondition();
- GCEstimation estimate = estimateCompactionGain(cancel);
- if (cancel.get()) {
- gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel);
- }
-
- sufficientEstimatedGain = estimate.gcNeeded();
- String gcLog = estimate.gcLog();
- if (sufficientEstimatedGain) {
- gcListener.info(
- "TarMK GC #{}: estimation completed in {} ({} ms). {}",
- GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
- } else {
- gcListener.skipped(
- "TarMK GC #{}: estimation completed in {} ({} ms). {}",
- GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
- }
- }
-
- if (sufficientEstimatedGain) {
- if (!gcOptions.isPaused()) {
- logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
- log(segmentWriter.getNodeCacheOccupancyInfo());
- Runnable cleanupTask = compact();
- if (cleanupTask != null) {
- cleanupTask.run();
- }
- logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
- log(segmentWriter.getNodeCacheOccupancyInfo());
- } else {
- gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
- }
- }
- }
-
- private static void logAndClear(
- @Nonnull DescriptiveStatistics nodeWriteTimeStats,
- @Nonnull DescriptiveStatistics nodeCompactTimeStats) {
- log.info("Node write time statistics (ns) {}", toString(nodeWriteTimeStats));
- log.info("Node compact time statistics (ns) {}", toString(nodeCompactTimeStats));
- nodeWriteTimeStats.clear();
- nodeCompactTimeStats.clear();
- }
-
- private static void log(@CheckForNull String nodeCacheOccupancyInfo) {
- if (nodeCacheOccupancyInfo != null) {
- log.info("NodeCache occupancy: {}", nodeCacheOccupancyInfo);
- }
- }
-
- private static String toString(DescriptiveStatistics statistics) {
- DecimalFormat sci = new DecimalFormat("##0.0E0");
- DecimalFormatSymbols symbols = sci.getDecimalFormatSymbols();
- symbols.setNaN("NaN");
- symbols.setInfinity("Inf");
- sci.setDecimalFormatSymbols(symbols);
- return "min=" + sci.format(statistics.getMin()) +
- ", 10%=" + sci.format(statistics.getPercentile(10.0)) +
- ", 50%=" + sci.format(statistics.getPercentile(50.0)) +
- ", 90%=" + sci.format(statistics.getPercentile(90.0)) +
- ", max=" + sci.format(statistics.getMax()) +
- ", mean=" + sci.format(statistics.getMean()) +
- ", stdev=" + sci.format(statistics.getStandardDeviation()) +
- ", N=" + sci.format(statistics.getN());
- }
-
static Map<Integer, Map<Character, File>> collectFiles(File directory) {
Map<Integer, Map<Character, File>> dataFiles = newHashMap();
Map<Integer, File> bulkFiles = newHashMap();
@@ -695,35 +597,6 @@ public class FileStore implements Segmen
}
}
- /**
- * Estimated compaction gain. The result will be undefined if stopped through
- * the passed {@code stop} signal.
- * @param stop signal for stopping the estimation process.
- * @return compaction gain estimate
- */
- GCEstimation estimateCompactionGain(Supplier<Boolean> stop) {
- if (gcOptions.isGcSizeDeltaEstimation()) {
- SizeDeltaGcEstimation e = new SizeDeltaGcEstimation(gcOptions,
- gcJournal, stats.getApproximateSize());
- return e;
- }
-
- CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(),
- count(), stop, gcOptions.getGainThreshold());
- fileStoreLock.readLock().lock();
- try {
- for (TarReader reader : readers) {
- reader.accept(estimate);
- if (stop.get()) {
- break;
- }
- }
- } finally {
- fileStoreLock.readLock().unlock();
- }
- return estimate;
- }
-
public FileStoreStats getStats() {
return stats;
}
@@ -735,209 +608,47 @@ public class FileStore implements Segmen
segmentWriter.flush();
tarWriter.flush();
stats.flushed();
-
+
return null;
}
});
}
/**
- * Try to acquire the passed {@code semaphore}
- * @param semaphore
- * @return a closable for releasing the {@code semaphore}
- * @throws IllegalStateException if acquiring the {@code semaphore} failed.
- */
- private static Closeable withSemaphore(@Nonnull final Semaphore semaphore) {
- if (!semaphore.tryAcquire()) {
- throw new IllegalStateException("Compaction or cleanup already in progress");
- }
- return new Closeable() {
- @Override
- public void close() {
- semaphore.release();
- }
- };
- }
-
- /**
- * Run garbage collection on the segment level: reclaim those data segments
- * that are from an old segment generation and those bulk segments that are not
- * reachable anymore.
- * Those tar files that shrink by at least 25% are rewritten to a new tar generation
- * skipping the reclaimed segments.
+ * Run garbage collection: estimation, compaction, cleanup
+ * @throws IOException
*/
- public void cleanup() throws IOException {
- fileReaper.add(cleanupOldGenerations(getGcGeneration()));
+ public void gc() throws IOException {
+ garbageCollector.run();
}
/**
- * Cleanup segments that are from an old generation. That segments whose generation
- * is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older.
- * @param gcGeneration
- * @return list of files to be removed
- * @throws IOException
+ * Run the compaction gain estimation process.
+ * @return
*/
- private List<File> cleanupOldGenerations(int gcGeneration) throws IOException {
- final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations();
-
- Predicate<Integer> reclaimPredicate = new Predicate<Integer>() {
- @Override
- public boolean apply(Integer generation) {
- return generation <= reclaimGeneration;
- }
- };
- return cleanup(reclaimPredicate,
- "gc-count=" + GC_COUNT +
- ",gc-status=success" +
- ",store-generation=" + gcGeneration +
- ",reclaim-predicate=(generation<=" + reclaimGeneration + ")");
+ public GCEstimation estimateCompactionGain() {
+ return garbageCollector.estimateCompactionGain(Suppliers.ofInstance(false));
}
/**
- * Cleanup segments of the given generation {@code gcGeneration}.
- * @param gcGeneration
- * @return list of files to be removed
- * @throws IOException
+ * Copy every referenced record in data (non-bulk) segments. Bulk segments
+ * are fully kept (they are only removed in cleanup, if there is no
+ * reference to them).
+ * @return {@code true} on success, {@code false} otherwise.
*/
- private List<File> cleanupGeneration(final int gcGeneration) throws IOException {
- Predicate<Integer> cleanupPredicate = new Predicate<Integer>() {
- @Override
- public boolean apply(Integer generation) {
- return generation == gcGeneration;
- }
- };
- return cleanup(cleanupPredicate,
- "gc-count=" + GC_COUNT +
- ",gc-status=failed" +
- ",store-generation=" + (gcGeneration - 1) +
- ",reclaim-predicate=(generation==" + gcGeneration + ")");
+ public boolean compact() throws IOException {
+ return garbageCollector.compact() > 0;
}
/**
- * Cleanup segments whose generation matches the {@code reclaimGeneration} predicate.
- * @param reclaimGeneration
- * @param gcInfo gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)}
- * @return list of files to be removed
- * @throws IOException
+ * Run garbage collection on the segment level: reclaim those data segments
+ * that are from an old segment generation and those bulk segments that are not
+ * reachable anymore.
+ * Those tar files that shrink by at least 25% are rewritten to a new tar generation
+ * skipping the reclaimed segments.
*/
- private List<File> cleanup(
- @Nonnull Predicate<Integer> reclaimGeneration,
- @Nonnull String gcInfo)
- throws IOException {
- try (Closeable s = withSemaphore(gcSemaphore)) {
- Stopwatch watch = Stopwatch.createStarted();
- Set<UUID> bulkRefs = newHashSet();
- Map<TarReader, TarReader> cleaned = newLinkedHashMap();
-
- long initialSize = 0;
- fileStoreLock.writeLock().lock();
- try {
- gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
-
- newWriter();
- segmentCache.clear();
-
- // Suggest to the JVM that now would be a good time
- // to clear stale weak references in the SegmentTracker
- System.gc();
-
- collectBulkReferences(bulkRefs);
-
- for (TarReader reader : readers) {
- cleaned.put(reader, reader);
- initialSize += reader.size();
- }
- } finally {
- fileStoreLock.writeLock().unlock();
- }
-
- gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)",
- GC_COUNT, humanReadableByteCount(initialSize), initialSize);
-
- Set<UUID> reclaim = newHashSet();
- for (TarReader reader : cleaned.keySet()) {
- reader.mark(bulkRefs, reclaim, reclaimGeneration);
- log.info("{}: size of bulk references/reclaim set {}/{}",
- reader, bulkRefs.size(), reclaim.size());
- if (shutdown) {
- gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
- break;
- }
- }
- Set<UUID> reclaimed = newHashSet();
- for (TarReader reader : cleaned.keySet()) {
- cleaned.put(reader, reader.sweep(reclaim, reclaimed));
- if (shutdown) {
- gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
- break;
- }
- }
-
- // it doesn't account for concurrent commits that might have happened
- long afterCleanupSize = 0;
-
- List<TarReader> oldReaders = newArrayList();
- fileStoreLock.writeLock().lock();
- try {
- // Replace current list of reader with the cleaned readers taking care not to lose
- // any new reader that might have come in through concurrent calls to newWriter()
- List<TarReader> sweptReaders = newArrayList();
- for (TarReader reader : readers) {
- if (cleaned.containsKey(reader)) {
- TarReader newReader = cleaned.get(reader);
- if (newReader != null) {
- sweptReaders.add(newReader);
- afterCleanupSize += newReader.size();
- }
- // if these two differ, the former represents the swept version of the latter
- if (newReader != reader) {
- oldReaders.add(reader);
- }
- } else {
- sweptReaders.add(reader);
- }
- }
- readers = sweptReaders;
- } finally {
- fileStoreLock.writeLock().unlock();
- }
- tracker.clearSegmentIdTables(reclaimed, gcInfo);
-
- // Close old readers *after* setting readers to the new readers to avoid accessing
- // a closed reader from readSegment()
- LinkedList<File> toRemove = newLinkedList();
- for (TarReader oldReader : oldReaders) {
- closeAndLogOnFail(oldReader);
- File file = oldReader.getFile();
- gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
- toRemove.addLast(file);
- }
-
- long finalSize = size();
- long reclaimedSize = initialSize - afterCleanupSize;
- stats.reclaimed(reclaimedSize);
- gcJournal.persist(reclaimedSize, finalSize);
- gcListener.cleaned(reclaimedSize, finalSize);
- gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
- " and space reclaimed {} ({} bytes).",
- GC_COUNT, watch, watch.elapsed(MILLISECONDS),
- humanReadableByteCount(finalSize), finalSize,
- humanReadableByteCount(reclaimedSize), reclaimedSize);
- return toRemove;
- }
- }
-
- private void collectBulkReferences(Set<UUID> bulkRefs) {
- Set<UUID> refs = newHashSet();
- for (SegmentId id : tracker.getReferencedSegmentIds()) {
- refs.add(id.asUUID());
- }
- tarWriter.collectReferences(refs);
- for (UUID ref : refs) {
- if (!isDataSegmentId(ref.getLeastSignificantBits())) {
- bulkRefs.add(ref);
- }
- }
+ public void cleanup() throws IOException {
+ garbageCollector.cleanup();
}
/**
@@ -953,209 +664,15 @@ public class FileStore implements Segmen
* @param collector reference collector called back for each blob reference found
*/
public void collectBlobReferences(ReferenceCollector collector) throws IOException {
- segmentWriter.flush();
- List<TarReader> tarReaders = newArrayList();
- fileStoreLock.writeLock().lock();
- try {
- newWriter();
- tarReaders.addAll(this.readers);
- } finally {
- fileStoreLock.writeLock().unlock();
- }
-
- int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1;
- for (TarReader tarReader : tarReaders) {
- tarReader.collectBlobReferences(collector, newReferenceReader(this), minGeneration);
- }
- }
-
- /**
- * Returns the cancellation policy for the compaction phase.
- * @return a supplier indicating if compaction should be canceled.
- */
- private Supplier<Boolean> newCancelCompactionCondition() {
- return new CancelCompactionSupplier(this);
+ garbageCollector.collectBlobReferences(collector);
}
/**
- * @param duration
- * @param unit
- * @return {@code Supplier} instance which returns true once the time specified in
- * {@code duration} and {@code unit} has passed.
+ * Cancel a running revision garbage collection compaction process as soon as possible.
+ * Does nothing if gc is not running.
*/
- private static Supplier<Boolean> timeOut(final long duration, @Nonnull final TimeUnit unit) {
- return new Supplier<Boolean>() {
- long deadline = currentTimeMillis() + MILLISECONDS.convert(duration, unit);
- @Override
- public Boolean get() {
- return currentTimeMillis() > deadline;
- }
- };
- }
-
- /**
- * @param supplier1
- * @param supplier2
- * @return {@code Supplier} instance that returns {@code true} iff {@code supplier1} returns
- * {@code true} or otherwise {@code supplier2} returns {@code true}.
- */
- private static Supplier<Boolean> or(
- @Nonnull Supplier<Boolean> supplier1,
- @Nonnull Supplier<Boolean> supplier2) {
- if (supplier1.get()) {
- return Suppliers.ofInstance(true);
- } else {
- return supplier2;
- }
- }
-
- /**
- * Copy every referenced record in data (non-bulk) segments. Bulk segments
- * are fully kept (they are only removed in cleanup, if there is no
- * reference to them).
- * @return A {@code Runnable} for cleaning up or {@code null} when compact failed.
- */
- @CheckForNull
- public Runnable compact() throws IOException {
- try (Closeable s = withSemaphore(gcSemaphore)) {
- Stopwatch watch = Stopwatch.createStarted();
- gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
-
- SegmentNodeState before = getHead();
- final int newGeneration = getGcGeneration() + 1;
- SegmentBufferWriter bufferWriter = new SegmentBufferWriter(this, tracker, segmentReader, "c", newGeneration);
- Supplier<Boolean> cancel = newCancelCompactionCondition();
- SegmentNodeState after = compact(bufferWriter, before, cancel);
- if (after == null) {
- gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
- return null;
- }
-
- gcListener.info("TarMK GC #{}: compacted {} to {}",
- GC_COUNT, before.getRecordId(), after.getRecordId());
-
- int cycles = 0;
- boolean success = false;
- while (cycles < gcOptions.getRetryCount() &&
- !(success = revisions.setHead(before.getRecordId(), after.getRecordId(), EXPEDITE_OPTION))) {
- // Some other concurrent changes have been made.
- // Rebase (and compact) those changes on top of the
- // compacted state before retrying to set the head.
- cycles++;
- gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
- "Compacting these commits. Cycle {} of {}",
- GC_COUNT, cycles, gcOptions.getRetryCount());
- SegmentNodeState head = getHead();
- after = compact(bufferWriter, head, cancel);
- if (after == null) {
- gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
- return null;
- }
-
- gcListener.info("TarMK GC #{}: compacted {} against {} to {}",
- GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
- before = head;
- }
-
- if (!success) {
- gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
- GC_COUNT, cycles);
- int forceTimeout = gcOptions.getForceTimeout();
- if (forceTimeout > 0) {
- gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds",
- GC_COUNT, forceTimeout);
- cycles++;
- success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS)));
- if (!success) {
- if (cancel.get()) {
- gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
- "Compaction was cancelled: {}.", GC_COUNT, cancel);
- } else {
- gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
- "Most likely compaction didn't get exclusive access to the store.", GC_COUNT);
- }
- }
- }
- }
-
- if (success) {
- gcListener.compacted(SUCCESS, newGeneration);
- gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
- GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
- return new Runnable() {
- @Override
- public void run() {
- try {
- fileReaper.add(cleanupOldGenerations(newGeneration));
- } catch (IOException e) {
- gcListener.error("TarMK GC #" + GC_COUNT + ": cleanup failed", e);
- }
- }
- };
- } else {
- gcListener.compacted(FAILURE, newGeneration);
- gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
- GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
- return new Runnable() {
- @Override
- public void run() {
- try {
- gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT);
- fileReaper.add(cleanupGeneration(newGeneration));
- } catch (IOException e) {
- gcListener.error("TarMK GC #" + GC_COUNT + ": cleanup failed", e);
- }
- }
- };
- }
- } catch (InterruptedException e) {
- gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
- currentThread().interrupt();
- return null;
- } catch (Exception e) {
- gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
- return null;
- }
- }
-
- private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head,
- Supplier<Boolean> cancel)
- throws IOException {
- if (gcOptions.isOffline()) {
- SegmentWriter writer = new SegmentWriter(this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer);
- return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions)
- .compact(EMPTY_NODE, head, EMPTY_NODE);
- } else {
- return segmentWriter.writeNode(head, bufferWriter, cancel);
- }
- }
-
- private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
- @Nonnull final Supplier<Boolean> cancel)
- throws InterruptedException {
- return revisions.
- setHead(new Function<RecordId, RecordId>() {
- @Nullable
- @Override
- public RecordId apply(RecordId base) {
- try {
- long t0 = currentTimeMillis();
- SegmentNodeState after = compact(bufferWriter,
- segmentReader.readNode(base), cancel);
- if (after == null) {
- gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds",
- GC_COUNT, (currentTimeMillis() - t0) / 1000);
- return null;
- } else {
- return after.getRecordId();
- }
- } catch (IOException e) {
- gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
- return null;
- }
- }
- },
- timeout(gcOptions.getForceTimeout(), SECONDS));
+ public void cancelGC() {
+ garbageCollector.cancel();
}
public Iterable<SegmentId> getSegmentIds() {
@@ -1504,7 +1021,7 @@ public class FileStore implements Segmen
return emptyMap();
}
- private void checkDiskSpace() {
+ private void checkDiskSpace(SegmentGCOptions gcOptions) {
long repositoryDiskSpace = size();
long availableDiskSpace = directory.getFreeSpace();
boolean updated = gcOptions.isDiskSpaceSufficient(repositoryDiskSpace, availableDiskSpace);
@@ -1619,7 +1136,7 @@ public class FileStore implements Segmen
}
@Override
- public Runnable compact() {
+ public boolean compact() {
throw new UnsupportedOperationException("Read Only Store");
}
@@ -1640,61 +1157,541 @@ public class FileStore implements Segmen
}
}
+ private class GarbageCollector {
+ @Nonnull
+ private final SegmentGCOptions gcOptions;
- /**
- * Represents the cancellation policy for the compaction phase. If the disk
- * space was considered insufficient at least once during compaction (or if
- * the space was never sufficient to begin with), compaction is considered
- * canceled. Furthermore when the file store is shutting down, compaction is
- * considered canceled.
- */
- private static class CancelCompactionSupplier implements Supplier<Boolean> {
+ /**
+ * {@code GcListener} listening to this instance's gc progress
+ */
+ @Nonnull
+ private final GCListener gcListener;
- private static enum REASON {
- UNKNOWN, DISK_SPACE, SHUTDOWN, MANUAL
- };
+ @Nonnull
+ private final GCJournal gcJournal;
+
+ private volatile boolean cancelled;
+
+ GarbageCollector(@Nonnull SegmentGCOptions gcOptions,
+ @Nonnull GCListener gcListener,
+ @Nonnull GCJournal gcJournal) {
+ this.gcOptions = gcOptions;
+ this.gcListener = gcListener;
+ this.gcJournal = gcJournal;
+ }
+
+ synchronized void run() throws IOException {
+ gcListener.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
+ Stopwatch watch = Stopwatch.createStarted();
- private REASON reason = REASON.UNKNOWN;
+ int gainThreshold = gcOptions.getGainThreshold();
+ boolean sufficientEstimatedGain = true;
+ if (gainThreshold <= 0) {
+ gcListener.info("TarMK GC #{}: estimation skipped because gain threshold value ({} <= 0)",
+ GC_COUNT, gainThreshold);
+ } else if (gcOptions.isPaused()) {
+ gcListener.info("TarMK GC #{}: estimation skipped because compaction is paused", GC_COUNT);
+ } else {
+ gcListener.info("TarMK GC #{}: estimation started", GC_COUNT);
+ Supplier<Boolean> cancel = new CancelCompactionSupplier(FileStore.this);
+ GCEstimation estimate = estimateCompactionGain(cancel);
+ if (cancel.get()) {
+ gcListener.info("TarMK GC #{}: estimation interrupted: {}. Skipping compaction.", GC_COUNT, cancel);
+ return;
+ }
- private final FileStore store;
+ sufficientEstimatedGain = estimate.gcNeeded();
+ String gcLog = estimate.gcLog();
+ if (sufficientEstimatedGain) {
+ gcListener.info(
+ "TarMK GC #{}: estimation completed in {} ({} ms). {}",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
+ } else {
+ gcListener.skipped(
+ "TarMK GC #{}: estimation completed in {} ({} ms). {}",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS), gcLog);
+ }
+ }
- public CancelCompactionSupplier(FileStore store) {
- this.store = store;
- this.store.gcOptions.setStopCompaction(false);
+ if (sufficientEstimatedGain) {
+ if (!gcOptions.isPaused()) {
+ logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
+ log(segmentWriter.getNodeCacheOccupancyInfo());
+ int gen = compact();
+ if (gen > 0) {
+ fileReaper.add(cleanupOldGenerations(gen));
+ } else if (gen < 0) {
+ gcListener.info("TarMK GC #{}: cleaning up after failed compaction", GC_COUNT);
+ fileReaper.add(cleanupGeneration(-gen));
+ }
+ logAndClear(segmentWriter.getNodeWriteTimeStats(), segmentWriter.getNodeCompactTimeStats());
+ log(segmentWriter.getNodeCacheOccupancyInfo());
+ } else {
+ gcListener.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
+ }
+ }
}
- @Override
- public Boolean get() {
- // The outOfDiskSpace and shutdown flags can only transition from
- // false (their initial
- // values), to true. Once true, there should be no way to go back.
- if (!store.sufficientDiskSpace.get()) {
- reason = REASON.DISK_SPACE;
- return true;
+ /**
+ * Estimated compaction gain. The result will be undefined if stopped through
+ * the passed {@code stop} signal.
+ * @param stop signal for stopping the estimation process.
+ * @return compaction gain estimate
+ */
+ synchronized GCEstimation estimateCompactionGain(Supplier<Boolean> stop) {
+ if (gcOptions.isGcSizeDeltaEstimation()) {
+ SizeDeltaGcEstimation e = new SizeDeltaGcEstimation(gcOptions,
+ gcJournal, stats.getApproximateSize());
+ return e;
}
- if (store.shutdown) {
- reason = REASON.SHUTDOWN;
- return true;
+
+ CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(),
+ count(), stop, gcOptions.getGainThreshold());
+ fileStoreLock.readLock().lock();
+ try {
+ for (TarReader reader : readers) {
+ reader.accept(estimate);
+ if (stop.get()) {
+ break;
+ }
+ }
+ } finally {
+ fileStoreLock.readLock().unlock();
}
- if (store.gcOptions.isStopCompaction()) {
- reason = REASON.MANUAL;
- return true;
+ return estimate;
+ }
+
+ private void logAndClear(
+ @Nonnull DescriptiveStatistics nodeWriteTimeStats,
+ @Nonnull DescriptiveStatistics nodeCompactTimeStats) {
+ log.info("Node write time statistics (ns) {}", toString(nodeWriteTimeStats));
+ log.info("Node compact time statistics (ns) {}", toString(nodeCompactTimeStats));
+ nodeWriteTimeStats.clear();
+ nodeCompactTimeStats.clear();
+ }
+
+ private void log(@CheckForNull String nodeCacheOccupancyInfo) {
+ if (nodeCacheOccupancyInfo != null) {
+ log.info("NodeCache occupancy: {}", nodeCacheOccupancyInfo);
}
- return false;
}
- @Override
- public String toString() {
- switch (reason) {
- case DISK_SPACE:
- return "Not enough disk space available";
- case SHUTDOWN:
- return "FileStore shutdown request received";
- case MANUAL:
- return "GC stop request received";
- default:
- return "";
+ private String toString(DescriptiveStatistics statistics) {
+ DecimalFormat sci = new DecimalFormat("##0.0E0");
+ DecimalFormatSymbols symbols = sci.getDecimalFormatSymbols();
+ symbols.setNaN("NaN");
+ symbols.setInfinity("Inf");
+ sci.setDecimalFormatSymbols(symbols);
+ return "min=" + sci.format(statistics.getMin()) +
+ ", 10%=" + sci.format(statistics.getPercentile(10.0)) +
+ ", 50%=" + sci.format(statistics.getPercentile(50.0)) +
+ ", 90%=" + sci.format(statistics.getPercentile(90.0)) +
+ ", max=" + sci.format(statistics.getMax()) +
+ ", mean=" + sci.format(statistics.getMean()) +
+ ", stdev=" + sci.format(statistics.getStandardDeviation()) +
+ ", N=" + sci.format(statistics.getN());
+ }
+
+ synchronized int compact() throws IOException {
+ try {
+ Stopwatch watch = Stopwatch.createStarted();
+ gcListener.info("TarMK GC #{}: compaction started, gc options={}", GC_COUNT, gcOptions);
+
+ SegmentNodeState before = getHead();
+ final int newGeneration = getGcGeneration() + 1;
+ SegmentBufferWriter bufferWriter = new SegmentBufferWriter(FileStore.this, tracker, segmentReader, "c", newGeneration);
+ Supplier<Boolean> cancel = new CancelCompactionSupplier(FileStore.this);
+ SegmentNodeState after = compact(bufferWriter, before, cancel);
+ if (after == null) {
+ gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
+ return 0;
+ }
+
+ gcListener.info("TarMK GC #{}: compacted {} to {}",
+ GC_COUNT, before.getRecordId(), after.getRecordId());
+
+ int cycles = 0;
+ boolean success = false;
+ while (cycles < gcOptions.getRetryCount() &&
+ !(success = revisions.setHead(before.getRecordId(), after.getRecordId(), EXPEDITE_OPTION))) {
+ // Some other concurrent changes have been made.
+ // Rebase (and compact) those changes on top of the
+ // compacted state before retrying to set the head.
+ cycles++;
+ gcListener.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
+ "Compacting these commits. Cycle {} of {}",
+ GC_COUNT, cycles, gcOptions.getRetryCount());
+ SegmentNodeState head = getHead();
+ after = compact(bufferWriter, head, cancel);
+ if (after == null) {
+ gcListener.info("TarMK GC #{}: compaction cancelled: {}.", GC_COUNT, cancel);
+ return 0;
+ }
+
+ gcListener.info("TarMK GC #{}: compacted {} against {} to {}",
+ GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
+ before = head;
+ }
+
+ if (!success) {
+ gcListener.info("TarMK GC #{}: compaction gave up compacting concurrent commits after {} cycles.",
+ GC_COUNT, cycles);
+ int forceTimeout = gcOptions.getForceTimeout();
+ if (forceTimeout > 0) {
+ gcListener.info("TarMK GC #{}: trying to force compact remaining commits for {} seconds",
+ GC_COUNT, forceTimeout);
+ cycles++;
+ success = forceCompact(bufferWriter, or(cancel, timeOut(forceTimeout, SECONDS)));
+ if (!success) {
+ if (cancel.get()) {
+ gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
+ "Compaction was cancelled: {}.", GC_COUNT, cancel);
+ } else {
+ gcListener.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
+ "Most likely compaction didn't get exclusive access to the store.", GC_COUNT);
+ }
+ }
+ }
+ }
+
+ if (success) {
+ gcListener.compacted(SUCCESS, newGeneration);
+ gcListener.info("TarMK GC #{}: compaction succeeded in {} ({} ms), after {} cycles",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
+ return newGeneration;
+ } else {
+ gcListener.compacted(FAILURE, newGeneration);
+ gcListener.info("TarMK GC #{}: compaction failed after {} ({} ms), and {} cycles",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS), cycles);
+ return -newGeneration;
+ }
+ } catch (InterruptedException e) {
+ gcListener.error("TarMK GC #" + GC_COUNT + ": compaction interrupted", e);
+ currentThread().interrupt();
+ return 0;
+ } catch (Exception e) {
+ gcListener.error("TarMK GC #" + GC_COUNT + ": compaction encountered an error", e);
+ return 0;
+ }
+ }
+
+ /**
+ * @param duration
+ * @param unit
+ * @return {@code Supplier} instance which returns true once the time specified in
+ * {@code duration} and {@code unit} has passed.
+ */
+ private Supplier<Boolean> timeOut(final long duration, @Nonnull final TimeUnit unit) {
+ return new Supplier<Boolean>() {
+ long deadline = currentTimeMillis() + MILLISECONDS.convert(duration, unit);
+ @Override
+ public Boolean get() {
+ return currentTimeMillis() > deadline;
+ }
+ };
+ }
+
+ /**
+ * @param supplier1
+ * @param supplier2
+ * @return {@code Supplier} instance that returns {@code true} iff {@code supplier1} returns
+ * {@code true} or otherwise {@code supplier2} returns {@code true}.
+ */
+ private Supplier<Boolean> or(
+ @Nonnull Supplier<Boolean> supplier1,
+ @Nonnull Supplier<Boolean> supplier2) {
+ if (supplier1.get()) {
+ return Suppliers.ofInstance(true);
+ } else {
+ return supplier2;
+ }
+ }
+
+ private SegmentNodeState compact(SegmentBufferWriter bufferWriter, NodeState head,
+ Supplier<Boolean> cancel)
+ throws IOException {
+ if (gcOptions.isOffline()) {
+ SegmentWriter writer = new SegmentWriter(FileStore.this, segmentReader, blobStore, new Default(), bufferWriter, binaryReferenceConsumer);
+ return new Compactor(segmentReader, writer, blobStore, cancel, gcOptions)
+ .compact(EMPTY_NODE, head, EMPTY_NODE);
+ } else {
+ return segmentWriter.writeNode(head, bufferWriter, cancel);
+ }
+ }
+
+ private boolean forceCompact(@Nonnull final SegmentBufferWriter bufferWriter,
+ @Nonnull final Supplier<Boolean> cancel)
+ throws InterruptedException {
+ return revisions.
+ setHead(new Function<RecordId, RecordId>() {
+ @Nullable
+ @Override
+ public RecordId apply(RecordId base) {
+ try {
+ long t0 = currentTimeMillis();
+ SegmentNodeState after = compact(bufferWriter,
+ segmentReader.readNode(base), cancel);
+ if (after == null) {
+ gcListener.info("TarMK GC #{}: compaction cancelled after {} seconds",
+ GC_COUNT, (currentTimeMillis() - t0) / 1000);
+ return null;
+ } else {
+ return after.getRecordId();
+ }
+ } catch (IOException e) {
+ gcListener.error("TarMK GC #{" + GC_COUNT + "}: Error during forced compaction.", e);
+ return null;
+ }
+ }
+ },
+ timeout(gcOptions.getForceTimeout(), SECONDS));
+ }
+
+ synchronized void cleanup() throws IOException {
+ fileReaper.add(cleanupOldGenerations(getGcGeneration()));
+ }
+
+ /**
+ * Cleanup segments that are from an old generation. That segments whose generation
+ * is {@code gcGeneration - SegmentGCOptions.getRetainedGenerations()} or older.
+ * @param gcGeneration
+ * @return list of files to be removed
+ * @throws IOException
+ */
+ private List<File> cleanupOldGenerations(int gcGeneration) throws IOException {
+ final int reclaimGeneration = gcGeneration - gcOptions.getRetainedGenerations();
+
+ Predicate<Integer> reclaimPredicate = new Predicate<Integer>() {
+ @Override
+ public boolean apply(Integer generation) {
+ return generation <= reclaimGeneration;
+ }
+ };
+ return cleanup(reclaimPredicate,
+ "gc-count=" + GC_COUNT +
+ ",gc-status=success" +
+ ",store-generation=" + gcGeneration +
+ ",reclaim-predicate=(generation<=" + reclaimGeneration + ")");
+ }
+
+ /**
+ * Cleanup segments whose generation matches the {@code reclaimGeneration} predicate.
+ * @param reclaimGeneration
+ * @param gcInfo gc information to be passed to {@link SegmentIdTable#clearSegmentIdTables(Set, String)}
+ * @return list of files to be removed
+ * @throws IOException
+ */
+ private List<File> cleanup(
+ @Nonnull Predicate<Integer> reclaimGeneration,
+ @Nonnull String gcInfo)
+ throws IOException {
+ Stopwatch watch = Stopwatch.createStarted();
+ Set<UUID> bulkRefs = newHashSet();
+ Map<TarReader, TarReader> cleaned = newLinkedHashMap();
+
+ long initialSize = 0;
+ fileStoreLock.writeLock().lock();
+ try {
+ gcListener.info("TarMK GC #{}: cleanup started.", GC_COUNT);
+
+ newWriter();
+ segmentCache.clear();
+
+ // Suggest to the JVM that now would be a good time
+ // to clear stale weak references in the SegmentTracker
+ System.gc();
+
+ collectBulkReferences(bulkRefs);
+
+ for (TarReader reader : readers) {
+ cleaned.put(reader, reader);
+ initialSize += reader.size();
+ }
+ } finally {
+ fileStoreLock.writeLock().unlock();
+ }
+
+ gcListener.info("TarMK GC #{}: current repository size is {} ({} bytes)",
+ GC_COUNT, humanReadableByteCount(initialSize), initialSize);
+
+ Set<UUID> reclaim = newHashSet();
+ for (TarReader reader : cleaned.keySet()) {
+ reader.mark(bulkRefs, reclaim, reclaimGeneration);
+ log.info("{}: size of bulk references/reclaim set {}/{}",
+ reader, bulkRefs.size(), reclaim.size());
+ if (shutdown) {
+ gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+ break;
+ }
+ }
+ Set<UUID> reclaimed = newHashSet();
+ for (TarReader reader : cleaned.keySet()) {
+ cleaned.put(reader, reader.sweep(reclaim, reclaimed));
+ if (shutdown) {
+ gcListener.info("TarMK GC #{}: cleanup interrupted", GC_COUNT);
+ break;
+ }
+ }
+
+ // it doesn't account for concurrent commits that might have happened
+ long afterCleanupSize = 0;
+
+ List<TarReader> oldReaders = newArrayList();
+ fileStoreLock.writeLock().lock();
+ try {
+ // Replace current list of reader with the cleaned readers taking care not to lose
+ // any new reader that might have come in through concurrent calls to newWriter()
+ List<TarReader> sweptReaders = newArrayList();
+ for (TarReader reader : readers) {
+ if (cleaned.containsKey(reader)) {
+ TarReader newReader = cleaned.get(reader);
+ if (newReader != null) {
+ sweptReaders.add(newReader);
+ afterCleanupSize += newReader.size();
+ }
+ // if these two differ, the former represents the swept version of the latter
+ if (newReader != reader) {
+ oldReaders.add(reader);
+ }
+ } else {
+ sweptReaders.add(reader);
+ }
+ }
+ readers = sweptReaders;
+ } finally {
+ fileStoreLock.writeLock().unlock();
+ }
+ tracker.clearSegmentIdTables(reclaimed, gcInfo);
+
+ // Close old readers *after* setting readers to the new readers to avoid accessing
+ // a closed reader from readSegment()
+ LinkedList<File> toRemove = newLinkedList();
+ for (TarReader oldReader : oldReaders) {
+ closeAndLogOnFail(oldReader);
+ File file = oldReader.getFile();
+ gcListener.info("TarMK GC #{}: cleanup marking file for deletion: {}", GC_COUNT, file.getName());
+ toRemove.addLast(file);
+ }
+
+ long finalSize = size();
+ long reclaimedSize = initialSize - afterCleanupSize;
+ stats.reclaimed(reclaimedSize);
+ gcJournal.persist(reclaimedSize, finalSize);
+ gcListener.cleaned(reclaimedSize, finalSize);
+ gcListener.info("TarMK GC #{}: cleanup completed in {} ({} ms). Post cleanup size is {} ({} bytes)" +
+ " and space reclaimed {} ({} bytes).",
+ GC_COUNT, watch, watch.elapsed(MILLISECONDS),
+ humanReadableByteCount(finalSize), finalSize,
+ humanReadableByteCount(reclaimedSize), reclaimedSize);
+ return toRemove;
+ }
+
+ private void collectBulkReferences(Set<UUID> bulkRefs) {
+ Set<UUID> refs = newHashSet();
+ for (SegmentId id : tracker.getReferencedSegmentIds()) {
+ refs.add(id.asUUID());
+ }
+ tarWriter.collectReferences(refs);
+ for (UUID ref : refs) {
+ if (!isDataSegmentId(ref.getLeastSignificantBits())) {
+ bulkRefs.add(ref);
+ }
}
}
+
+ /**
+ * Cleanup segments of the given generation {@code gcGeneration}.
+ * @param gcGeneration
+ * @return list of files to be removed
+ * @throws IOException
+ */
+ private List<File> cleanupGeneration(final int gcGeneration) throws IOException {
+ Predicate<Integer> cleanupPredicate = new Predicate<Integer>() {
+ @Override
+ public boolean apply(Integer generation) {
+ return generation == gcGeneration;
+ }
+ };
+ return cleanup(cleanupPredicate,
+ "gc-count=" + GC_COUNT +
+ ",gc-status=failed" +
+ ",store-generation=" + (gcGeneration - 1) +
+ ",reclaim-predicate=(generation==" + gcGeneration + ")");
+ }
+
+ /**
+ * Finds all external blob references that are currently accessible
+ * in this repository and adds them to the given collector. Useful
+ * for collecting garbage in an external data store.
+ * <p>
+ * Note that this method only collects blob references that are already
+ * stored in the repository (at the time when this method is called), so
+ * the garbage collector will need some other mechanism for tracking
+ * in-memory references and references stored while this method is
+ * running.
+ * @param collector reference collector called back for each blob reference found
+ */
+ synchronized void collectBlobReferences(ReferenceCollector collector) throws IOException {
+ segmentWriter.flush();
+ List<TarReader> tarReaders = newArrayList();
+ fileStoreLock.writeLock().lock();
+ try {
+ newWriter();
+ tarReaders.addAll(FileStore.this.readers);
+ } finally {
+ fileStoreLock.writeLock().unlock();
+ }
+
+ int minGeneration = getGcGeneration() - gcOptions.getRetainedGenerations() + 1;
+ for (TarReader tarReader : tarReaders) {
+ tarReader.collectBlobReferences(collector, newReferenceReader(FileStore.this), minGeneration);
+ }
+ }
+
+ void cancel() {
+ cancelled = true;
+ }
+
+ /**
+ * Represents the cancellation policy for the compaction phase. If the disk
+ * space was considered insufficient at least once during compaction (or if
+ * the space was never sufficient to begin with), compaction is considered
+ * canceled. Furthermore when the file store is shutting down, compaction is
+ * considered canceled.
+ */
+ private class CancelCompactionSupplier implements Supplier<Boolean> {
+ private final FileStore store;
+
+ private String reason;
+
+ public CancelCompactionSupplier(@Nonnull FileStore store) {
+ cancelled = false;
+ this.store = store;
+ }
+
+ @Override
+ public Boolean get() {
+ // The outOfDiskSpace and shutdown flags can only transition from
+ // false (their initial values), to true. Once true, there should
+ // be no way to go back.
+ if (!store.sufficientDiskSpace.get()) {
+ reason = "Not enough disk space";
+ return true;
+ }
+ if (store.shutdown) {
+ reason = "The FileStore is shutting down";
+ return true;
+ }
+ if (cancelled) {
+ reason = "Cancelled by user";
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() { return reason; }
+ }
}
+
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Mon Oct 10 15:10:06 2016
@@ -529,7 +529,7 @@ public class CompactionAndCleanupIT {
public Boolean call() throws IOException {
boolean cancelled = false;
for (int k = 0; !cancelled && k < 1000; k++) {
- cancelled = fileStore.compact() == null;
+ cancelled = !fileStore.compact();
}
return cancelled;
}
@@ -1080,10 +1080,7 @@ public class CompactionAndCleanupIT {
Callable<Void> concurrentCleanupTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
- // Concurrent cleanup calls are not supported by the file store
- synchronized (fileStore) {
- fileStore.cleanup();
- }
+ fileStore.cleanup();
return null;
}
};
@@ -1143,10 +1140,7 @@ public class CompactionAndCleanupIT {
final Callable<Void> concurrentCleanTask = new Callable<Void>() {
@Override
public Void call() throws Exception {
- // Concurrent cleanup calls are not supported by the file store
- synchronized (fileStore) {
- fileStore.cleanup();
- }
+ fileStore.cleanup();
return null;
}
};
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCompactionIT.java Mon Oct 10 15:10:06 2016
@@ -236,7 +236,7 @@ public class SegmentCompactionIT {
List<Registration> registrations = newArrayList();
registrations.add(registerMBean(segmentCompactionMBean,
new ObjectName("IT:TYPE=Segment Compaction")));
- registrations.add(registerMBean(new SegmentRevisionGCMBean(gcOptions),
+ registrations.add(registerMBean(new SegmentRevisionGCMBean(fileStore, gcOptions),
new ObjectName("IT:TYPE=Segment Revision GC")));
registrations.add(registerMBean(fileStoreGCMonitor,
new ObjectName("IT:TYPE=GC Monitor")));
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java?rev=1764116&r1=1764115&r2=1764116&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/CompactionEstimatorTest.java Mon Oct 10 15:10:06 2016
@@ -27,7 +27,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Random;
-import com.google.common.base.Suppliers;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.segment.SegmentNodeStore;
import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
@@ -80,13 +79,11 @@ public class CompactionEstimatorTest {
fileStore.flush();
try {
- GCEstimation est = fileStore.estimateCompactionGain(Suppliers
- .ofInstance(false));
+ GCEstimation est = fileStore.estimateCompactionGain();
assertTrue(est.gcNeeded());
if (est instanceof CompactionGainEstimate) {
// should be at 66%
- assertTrue(((CompactionGainEstimate) est)
- .estimateCompactionGain() > 60);
+ assertTrue(((CompactionGainEstimate) est).estimateCompactionGain() > 60);
}
} finally {
fileStore.close();