You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2018/04/23 11:47:24 UTC

svn commit: r1829859 [1/2] - /jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/

Author: frm
Date: Mon Apr 23 11:47:24 2018
New Revision: 1829859

URL: http://svn.apache.org/viewvc?rev=1829859&view=rev
Log:
OAK-7377 - Allow multiple GC implementations in FileStore

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PrefixedGCListener.java

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Represents the cancellation policy for the compaction phase. If the disk
+ * space was considered insufficient at least once during compaction (or if the
+ * space was never sufficient to begin with), compaction is considered canceled.
+ * Furthermore when the file store is shutting down, compaction is considered
+ * canceled. Finally the cancellation can be triggered by a timeout that can be
+ * set at any time.
+ */
+class CancelCompactionSupplier implements Supplier<Boolean> {
+
+    private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+    private final BooleanSupplier diskSpaceExhausted;
+
+    private final BooleanSupplier memoryExhausted;
+
+    private final BooleanSupplier shutDown;
+
+    private String reason;
+
+    private volatile long baseLine;
+
+    private volatile long deadline;
+
+    CancelCompactionSupplier(
+        BooleanSupplier diskSpaceExhausted,
+        BooleanSupplier memoryExhausted,
+        BooleanSupplier shutDown
+    ) {
+        this.diskSpaceExhausted = diskSpaceExhausted;
+        this.memoryExhausted = memoryExhausted;
+        this.shutDown = shutDown;
+    }
+
+    /**
+     * Set a timeout for cancellation. Setting a different timeout cancels a
+     * previous one that did not yet elapse. Setting a timeout after
+     * cancellation took place has no effect.
+     */
+    public void timeOutAfter(final long duration, @Nonnull final TimeUnit unit) {
+        baseLine = currentTimeMillis();
+        deadline = baseLine + MILLISECONDS.convert(duration, unit);
+    }
+
+    @Override
+    public Boolean get() {
+        // The outOfDiskSpace and shutdown flags can only transition from
+        // false (their initial values), to true. Once true, there should
+        // be no way to go back.
+        if (diskSpaceExhausted.getAsBoolean()) {
+            reason = "Not enough disk space";
+            return true;
+        }
+        if (memoryExhausted.getAsBoolean()) {
+            reason = "Not enough memory";
+            return true;
+        }
+        if (shutDown.getAsBoolean()) {
+            reason = "The FileStore is shutting down";
+            return true;
+        }
+        if (cancelled.get()) {
+            reason = "Cancelled by user";
+            return true;
+        }
+        if (deadline > 0 && currentTimeMillis() > deadline) {
+            long dt = SECONDS.convert(currentTimeMillis() - baseLine, MILLISECONDS);
+            reason = "Timeout after " + dt + " seconds";
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return reason;
+    }
+
+    public void cancel() {
+        cancelled.set(true);
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static org.apache.jackrabbit.oak.segment.file.Reclaimers.newOldReclaimer;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Predicate;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+
+/**
+ * Instances of this class represent the result from a compaction. Either
+ * succeeded, aborted or skipped.
+ */
+abstract class CompactionResult {
+
+    @Nonnull
+    private final GCGeneration currentGeneration;
+
+    private final int gcCount;
+
+    private CompactionResult(@Nonnull GCGeneration currentGeneration, int gcCount) {
+        this.currentGeneration = currentGeneration;
+        this.gcCount = gcCount;
+    }
+
+    /**
+     * Result of a succeeded compaction.
+     *
+     * @param gcType          the type of the succeeded compaction operation
+     * @param newGeneration   the generation successfully created by compaction
+     * @param gcOptions       the current GC options used by compaction
+     * @param compactedRootId the record id of the root created by compaction
+     */
+    static CompactionResult succeeded(
+        @Nonnull SegmentGCOptions.GCType gcType,
+        @Nonnull GCGeneration newGeneration,
+        @Nonnull final SegmentGCOptions gcOptions,
+        @Nonnull final RecordId compactedRootId,
+        int gcCount
+    ) {
+        return new CompactionResult(newGeneration, gcCount) {
+
+            @Override
+            Predicate<GCGeneration> reclaimer() {
+                return newOldReclaimer(gcType, newGeneration, gcOptions.getRetainedGenerations());
+            }
+
+            @Override
+            boolean isSuccess() {
+                return true;
+            }
+
+            @Override
+            RecordId getCompactedRootId() {
+                return compactedRootId;
+            }
+        };
+    }
+
+    /**
+     * Result of an aborted compaction.
+     *
+     * @param currentGeneration the current generation of the store
+     * @param failedGeneration  the generation that compaction attempted to
+     *                          create
+     */
+    static CompactionResult aborted(
+        @Nonnull GCGeneration currentGeneration,
+        @Nonnull final GCGeneration failedGeneration,
+        int gcCount
+    ) {
+        return new CompactionResult(currentGeneration, gcCount) {
+
+            @Override
+            Predicate<GCGeneration> reclaimer() {
+                return Reclaimers.newExactReclaimer(failedGeneration);
+            }
+
+            @Override
+            boolean isSuccess() {
+                return false;
+            }
+        };
+    }
+
+    /**
+     * Result serving as a placeholder for a compaction that was skipped.
+     *
+     * @param lastGCType        type of the most recent gc operation. {@link
+     *                          SegmentGCOptions.GCType#FULL} if none.
+     * @param currentGeneration the current generation of the store
+     * @param gcOptions         the current GC options used by compaction
+     */
+    static CompactionResult skipped(
+        @Nonnull SegmentGCOptions.GCType lastGCType,
+        @Nonnull GCGeneration currentGeneration,
+        @Nonnull final SegmentGCOptions gcOptions,
+        @Nonnull final RecordId compactedRootId,
+        int gcCount
+    ) {
+        return new CompactionResult(currentGeneration, gcCount) {
+
+            @Override
+            Predicate<GCGeneration> reclaimer() {
+                return Reclaimers.newOldReclaimer(lastGCType, currentGeneration, gcOptions.getRetainedGenerations());
+            }
+
+            @Override
+            boolean isSuccess() {
+                return true;
+            }
+
+            @Override
+            RecordId getCompactedRootId() {
+                return compactedRootId;
+            }
+        };
+    }
+
+    /**
+     * @return a predicate determining which segments to {@link
+     * FileStore.GarbageCollector#cleanup(CompactionResult) clean up} for the
+     * given compaction result.
+     */
+    abstract Predicate<GCGeneration> reclaimer();
+
+    /**
+     * @return {@code true} for succeeded and skipped, {@code false} otherwise.
+     */
+    abstract boolean isSuccess();
+
+    /**
+     * @return the record id of the compacted root on {@link #isSuccess()
+     * success}, {@link RecordId#NULL} otherwise.
+     */
+    RecordId getCompactedRootId() {
+        return RecordId.NULL;
+    }
+
+    /**
+     * @return a diagnostic message describing the outcome of this compaction.
+     */
+    String gcInfo() {
+        return String.format(
+            "gc-count=%d,gc-status=%s,store-generation=%s,reclaim-predicate=%s",
+            gcCount,
+            isSuccess() ? "success" : "failed",
+            currentGeneration,
+            reclaimer()
+        );
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.FULL;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.TAIL;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.CLEANUP;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_FORCE_COMPACT;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_RETRY;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.ESTIMATION;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.IDLE;
+import static org.apache.jackrabbit.oak.segment.file.PrintableBytes.newPrintableBytes;
+import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION;
+import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
+import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+class DefaultGarbageCollectionStrategy implements GarbageCollectionStrategy {
+
+    private GCGeneration getGcGeneration(Context context) {
+        return context.getRevisions().getHead().getSegmentId().getGcGeneration();
+    }
+
+    private SegmentNodeState getBase(Context context) {
+        String root = context.getGCJournal().read().getRoot();
+        RecordId rootId = RecordId.fromString(context.getSegmentTracker(), root);
+        if (RecordId.NULL.equals(rootId)) {
+            return null;
+        }
+        try {
+            SegmentNodeState node = context.getSegmentReader().readNode(rootId);
+            node.getPropertyCount();  // Resilience: fail early with a SNFE if the segment is not there
+            return node;
+        } catch (SegmentNotFoundException snfe) {
+            context.getGCListener().error("base state " + rootId + " is not accessible", snfe);
+            return null;
+        }
+    }
+
+    private SegmentNodeState getHead(Context context) {
+        return context.getSegmentReader().readHeadState(context.getRevisions());
+    }
+
+    @Override
+    public synchronized void collectGarbage(Context context) throws IOException {
+        switch (context.getGCOptions().getGCType()) {
+            case FULL:
+                collectFullGarbage(context);
+                break;
+            case TAIL:
+                collectTailGarbage(context);
+                break;
+            default:
+                throw new IllegalStateException("Invalid GC type");
+        }
+    }
+
+    @Override
+    public synchronized void collectFullGarbage(Context context) throws IOException {
+        run(context, true, this::compactFull);
+    }
+
+    @Override
+    public synchronized void collectTailGarbage(Context context) throws IOException {
+        run(context, false, this::compactTail);
+    }
+
+    private interface Compactor {
+
+        CompactionResult compact(Context contex) throws IOException;
+
+    }
+
+    private void run(Context context, boolean full, Compactor compact) throws IOException {
+        try {
+            context.getGCListener().info("started");
+
+            long dt = System.currentTimeMillis() - context.getLastSuccessfulGC();
+
+            if (dt < context.getGCBackOff()) {
+                context.getGCListener().skipped("skipping garbage collection as it already ran less than {} hours ago ({} s).", context.getGCBackOff() / 3600000, dt / 1000);
+                return;
+            }
+
+            boolean sufficientEstimatedGain = true;
+            if (context.getGCOptions().isEstimationDisabled()) {
+                context.getGCListener().info("estimation skipped because it was explicitly disabled");
+            } else if (context.getGCOptions().isPaused()) {
+                context.getGCListener().info("estimation skipped because compaction is paused");
+            } else {
+                context.getGCListener().info("estimation started");
+                context.getGCListener().updateStatus(ESTIMATION.message());
+
+                PrintableStopwatch watch = PrintableStopwatch.createStarted();
+                GCEstimationResult estimation = estimateCompactionGain(context, full);
+                sufficientEstimatedGain = estimation.isGcNeeded();
+                String gcLog = estimation.getGcLog();
+                if (sufficientEstimatedGain) {
+                    context.getGCListener().info("estimation completed in {}. {}", watch, gcLog);
+                } else {
+                    context.getGCListener().skipped("estimation completed in {}. {}", watch, gcLog);
+                }
+            }
+
+            if (sufficientEstimatedGain) {
+                try (GCMemoryBarrier ignored = new GCMemoryBarrier(context.getSufficientMemory(), context.getGCListener(), context.getGCOptions())) {
+                    if (context.getGCOptions().isPaused()) {
+                        context.getGCListener().skipped("compaction paused");
+                    } else if (!context.getSufficientMemory().get()) {
+                        context.getGCListener().skipped("compaction skipped. Not enough memory");
+                    } else {
+                        CompactionResult compactionResult = compact.compact(context);
+                        if (compactionResult.isSuccess()) {
+                            context.getSuccessfulGarbageCollectionListener().onSuccessfulGarbageCollection();
+                        } else {
+                            context.getGCListener().info("cleaning up after failed compaction");
+                        }
+                        context.getFileReaper().add(cleanup(context, compactionResult));
+                    }
+                }
+            }
+        } finally {
+            context.getCompactionMonitor().finished();
+            context.getGCListener().updateStatus(IDLE.message());
+        }
+    }
+
+    @Override
+    public synchronized CompactionResult compactFull(Context context) {
+        context.getGCListener().info("running full compaction");
+        return compact(context, FULL, EMPTY_NODE, getGcGeneration(context).nextFull());
+    }
+
+    @Override
+    public synchronized CompactionResult compactTail(Context context) {
+        context.getGCListener().info("running tail compaction");
+        SegmentNodeState base = getBase(context);
+        if (base != null) {
+            return compact(context, TAIL, base, getGcGeneration(context).nextTail());
+        }
+        context.getGCListener().info("no base state available, running full compaction instead");
+        return compact(context, FULL, EMPTY_NODE, getGcGeneration(context).nextFull());
+    }
+
+    private CompactionResult compact(Context context, SegmentGCOptions.GCType gcType, NodeState base, GCGeneration newGeneration) {
+        try {
+            PrintableStopwatch watch = PrintableStopwatch.createStarted();
+            context.getGCListener().info(
+                "compaction started, gc options={}, current generation={}, new generation={}",
+                context.getGCOptions(),
+                getHead(context).getRecordId().getSegment().getGcGeneration(),
+                newGeneration
+            );
+            context.getGCListener().updateStatus(COMPACTION.message());
+
+            GCJournal.GCJournalEntry gcEntry = context.getGCJournal().read();
+            long initialSize = size(context);
+
+            SegmentWriter writer = context.getSegmentWriterFactory().newSegmentWriter(newGeneration);
+
+            context.getCompactionMonitor().init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize);
+
+            CheckpointCompactor compactor = new CheckpointCompactor(
+                context.getGCListener(),
+                context.getSegmentReader(),
+                writer,
+                context.getBlobStore(),
+                context.getCanceller(),
+                context.getCompactionMonitor()
+            );
+
+            SegmentNodeState head = getHead(context);
+            SegmentNodeState compacted = compactor.compact(base, head, base);
+            if (compacted == null) {
+                context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller());
+                return compactionAborted(context, newGeneration);
+            }
+
+            context.getGCListener().info("compaction cycle 0 completed in {}. Compacted {} to {}",
+                watch, head.getRecordId(), compacted.getRecordId());
+
+            int cycles = 0;
+            boolean success = false;
+            SegmentNodeState previousHead = head;
+            while (cycles < context.getGCOptions().getRetryCount() &&
+                !(success = context.getRevisions().setHead(previousHead.getRecordId(), compacted.getRecordId(), EXPEDITE_OPTION))) {
+                // Some other concurrent changes have been made.
+                // Rebase (and compact) those changes on top of the
+                // compacted state before retrying to set the head.
+                cycles++;
+                context.getGCListener().info("compaction detected concurrent commits while compacting. " +
+                        "Compacting these commits. Cycle {} of {}",
+                    cycles, context.getGCOptions().getRetryCount());
+                context.getGCListener().updateStatus(COMPACTION_RETRY.message() + cycles);
+                PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted();
+
+                head = getHead(context);
+                compacted = compactor.compact(previousHead, head, compacted);
+                if (compacted == null) {
+                    context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller());
+                    return compactionAborted(context, newGeneration);
+                }
+
+                context.getGCListener().info("compaction cycle {} completed in {}. Compacted {} against {} to {}",
+                    cycles, cycleWatch, head.getRecordId(), previousHead.getRecordId(), compacted.getRecordId());
+                previousHead = head;
+            }
+
+            if (!success) {
+                context.getGCListener().info("compaction gave up compacting concurrent commits after {} cycles.", cycles);
+                int forceTimeout = context.getGCOptions().getForceTimeout();
+                if (forceTimeout > 0) {
+                    context.getGCListener().info("trying to force compact remaining commits for {} seconds. " +
+                            "Concurrent commits to the store will be blocked.",
+                        forceTimeout);
+                    context.getGCListener().updateStatus(COMPACTION_FORCE_COMPACT.message());
+                    PrintableStopwatch forceWatch = PrintableStopwatch.createStarted();
+
+                    cycles++;
+                    context.getCanceller().timeOutAfter(forceTimeout, SECONDS);
+                    compacted = forceCompact(context, previousHead, compacted, compactor);
+                    success = compacted != null;
+                    if (success) {
+                        context.getGCListener().info("compaction succeeded to force compact remaining commits after {}.", forceWatch);
+                    } else {
+                        if (context.getCanceller().get()) {
+                            context.getGCListener().warn("compaction failed to force compact remaining commits " +
+                                    "after {}. Compaction was cancelled: {}.",
+                                forceWatch, context.getCanceller());
+                        } else {
+                            context.getGCListener().warn("compaction failed to force compact remaining commits. " +
+                                    "after {}. Could not acquire exclusive access to the node store.",
+                                forceWatch);
+                        }
+                    }
+                }
+            }
+
+            if (success) {
+                // Update type of the last compaction before calling methods that could throw an exception.
+                context.getSuccessfulCompactionListener().onSuccessfulCompaction(gcType);
+                writer.flush();
+                context.getFlusher().flush();
+                context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles);
+                return compactionSucceeded(context, gcType, newGeneration, compacted.getRecordId());
+            } else {
+                context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles);
+                return compactionAborted(context, newGeneration);
+            }
+        } catch (InterruptedException e) {
+            context.getGCListener().error("compaction interrupted", e);
+            currentThread().interrupt();
+            return compactionAborted(context, newGeneration);
+        } catch (IOException e) {
+            context.getGCListener().error("compaction encountered an error", e);
+            return compactionAborted(context, newGeneration);
+        }
+    }
+
+    private CompactionResult compactionAborted(Context context, GCGeneration generation) {
+        context.getGCListener().compactionFailed(generation);
+        return CompactionResult.aborted(getGcGeneration(context), generation, context.getGCCount());
+    }
+
+    private CompactionResult compactionSucceeded(
+        Context context,
+        SegmentGCOptions.GCType gcType,
+        GCGeneration generation,
+        RecordId compactedRootId
+    ) {
+        context.getGCListener().compactionSucceeded(generation);
+        return CompactionResult.succeeded(gcType, generation, context.getGCOptions(), compactedRootId, context.getGCCount());
+    }
+
+    private SegmentNodeState forceCompact(
+        Context context,
+        final NodeState base,
+        final NodeState onto,
+        final CheckpointCompactor compactor
+    ) throws InterruptedException {
+        RecordId compactedId = setHead(context, headId -> {
+            try {
+                PrintableStopwatch t = PrintableStopwatch.createStarted();
+                SegmentNodeState after = compactor.compact(base, context.getSegmentReader().readNode(headId), onto);
+                if (after != null) {
+                    return after.getRecordId();
+                }
+                context.getGCListener().info("compaction cancelled after {}", t);
+                return null;
+            } catch (IOException e) {
+                context.getGCListener().error("error during forced compaction.", e);
+                return null;
+            }
+        });
+        if (compactedId == null) {
+            return null;
+        }
+        return context.getSegmentReader().readNode(compactedId);
+    }
+
+    private RecordId setHead(Context context, Function<RecordId, RecordId> f) throws InterruptedException {
+        return context.getRevisions().setHead(f, timeout(context.getGCOptions().getForceTimeout(), SECONDS));
+    }
+
+    private GCEstimationResult estimateCompactionGain(Context context, boolean full) {
+        return new SizeDeltaGcEstimation(
+            context.getGCOptions().getGcSizeDeltaEstimation(),
+            context.getGCJournal(),
+            context.getTarFiles().size(),
+            full
+        ).estimate();
+    }
+
+    private long size(Context context) {
+        return context.getTarFiles().size();
+    }
+
+    @Override
+    public synchronized List<String> cleanup(Context context) throws IOException {
+        return cleanup(context, CompactionResult.skipped(
+            context.getLastCompactionType(),
+            getGcGeneration(context),
+            context.getGCOptions(),
+            context.getRevisions().getHead(),
+            context.getGCCount()
+        ));
+    }
+
+    private List<String> cleanup(Context context, CompactionResult compactionResult)
+        throws IOException {
+        PrintableStopwatch watch = PrintableStopwatch.createStarted();
+
+        context.getGCListener().info("cleanup started using reclaimer {}", compactionResult.reclaimer());
+        context.getGCListener().updateStatus(CLEANUP.message());
+        context.getSegmentCache().clear();
+
+        // Suggest to the JVM that now would be a good time
+        // to clear stale weak references in the SegmentTracker
+        System.gc();
+
+        TarFiles.CleanupResult cleanupResult = context.getTarFiles().cleanup(newCleanupContext(context, compactionResult.reclaimer()));
+        if (cleanupResult.isInterrupted()) {
+            context.getGCListener().info("cleanup interrupted");
+        }
+        context.getSegmentTracker().clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(), compactionResult.gcInfo());
+        context.getGCListener().info("cleanup marking files for deletion: {}", toFileNames(cleanupResult.getRemovableFiles()));
+
+        long finalSize = size(context);
+        long reclaimedSize = cleanupResult.getReclaimedSize();
+        context.getFileStoreStats().reclaimed(reclaimedSize);
+        context.getGCJournal().persist(
+            reclaimedSize,
+            finalSize,
+            getGcGeneration(context),
+            context.getCompactionMonitor().getCompactedNodes(),
+            compactionResult.getCompactedRootId().toString10()
+        );
+        context.getGCListener().cleaned(reclaimedSize, finalSize);
+        context.getGCListener().info(
+            "cleanup completed in {}. Post cleanup size is {} and space reclaimed {}.",
+            watch,
+            newPrintableBytes(finalSize),
+            newPrintableBytes(reclaimedSize)
+        );
+        return cleanupResult.getRemovableFiles();
+    }
+
+    private CleanupContext newCleanupContext(Context context, Predicate<GCGeneration> old) {
+        return new CleanupContext() {
+
+            private boolean isUnreferencedBulkSegment(UUID id, boolean referenced) {
+                return !isDataSegmentId(id.getLeastSignificantBits()) && !referenced;
+            }
+
+            private boolean isOldDataSegment(UUID id, GCGeneration generation) {
+                return isDataSegmentId(id.getLeastSignificantBits()) && old.apply(generation);
+            }
+
+            @Override
+            public Collection<UUID> initialReferences() {
+                Set<UUID> references = newHashSet();
+                for (SegmentId id : context.getSegmentTracker().getReferencedSegmentIds()) {
+                    if (id.isBulkSegmentId()) {
+                        references.add(id.asUUID());
+                    }
+                }
+                return references;
+            }
+
+            @Override
+            public boolean shouldReclaim(UUID id, GCGeneration generation, boolean referenced) {
+                return isUnreferencedBulkSegment(id, referenced) || isOldDataSegment(id, generation);
+            }
+
+            @Override
+            public boolean shouldFollow(UUID from, UUID to) {
+                return !isDataSegmentId(to.getLeastSignificantBits());
+            }
+
+        };
+    }
+
+    private String toFileNames(@Nonnull List<String> files) {
+        if (files.isEmpty()) {
+            return "none";
+        } else {
+            return Joiner.on(",").join(files);
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1829859&r1=1829858&r2=1829859&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Mon Apr 23 11:47:24 2018
@@ -18,73 +18,41 @@
  */
 package org.apache.jackrabbit.oak.segment.file;
 
-import static com.google.common.collect.Sets.newHashSet;
-import static java.lang.Integer.getInteger;
 import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
-import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
-import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.FULL;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.TAIL;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.CLEANUP;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_FORCE_COMPACT;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_RETRY;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.ESTIMATION;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.IDLE;
 import static org.apache.jackrabbit.oak.segment.file.PrintableBytes.newPrintableBytes;
-import static org.apache.jackrabbit.oak.segment.file.Reclaimers.newOldReclaimer;
-import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION;
-import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 
-import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.UncheckedExecutionException;
-import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentId;
 import org.apache.jackrabbit.oak.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
 import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
 import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.WriterCacheManager;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
-import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType;
-import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry;
 import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
-import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,20 +63,8 @@ public class FileStore extends AbstractF
 
     private static final Logger log = LoggerFactory.getLogger(FileStore.class);
 
-    /**
-     * Minimal interval in milli seconds between subsequent garbage collection cycles.
-     * Garbage collection invoked via {@link #fullGC()} will be skipped unless at least
-     * the specified time has passed since its last successful invocation.
-     */
-    private static final long GC_BACKOFF = getInteger("oak.gc.backoff", 10*3600*1000);
-
     private static final int MB = 1024 * 1024;
 
-    /**
-     * GC counter for logging purposes
-     */
-    private static final AtomicLong GC_COUNT = new AtomicLong(0);
-
     @Nonnull
     private final SegmentWriter segmentWriter;
 
@@ -152,6 +108,8 @@ public class FileStore extends AbstractF
     @Nonnull
     private final SegmentNotFoundExceptionListener snfeListener;
 
+    private final GarbageCollectionStrategy garbageCollectionStrategy = new DefaultGarbageCollectionStrategy();
+
     FileStore(final FileStoreBuilder builder) throws InvalidFileStoreVersionException, IOException {
         super(builder);
 
@@ -164,12 +122,6 @@ public class FileStore extends AbstractF
                 .with(builder.getCacheManager()
                         .withAccessTracking("WRITE", builder.getStatsProvider()))
                 .build(this);
-        this.garbageCollector = new GarbageCollector(
-                builder.getGcOptions(),
-                builder.getGcListener(),
-                new GCJournal(persistence.getGCJournalFile()),
-                builder.getCacheManager()
-                        .withAccessTracking("COMPACT", builder.getStatsProvider()));
 
         newManifestChecker(persistence, builder.getStrictVersionCheck()).checkAndUpdateManifest();
 
@@ -190,6 +142,34 @@ public class FileStore extends AbstractF
 
         this.fileReaper = this.tarFiles.createFileReaper();
 
+        this.garbageCollector = new GarbageCollector(
+            builder.getGcOptions(),
+            builder.getGcListener(),
+            new GCJournal(persistence.getGCJournalFile()),
+            sufficientMemory,
+            fileReaper,
+            tarFiles,
+            tracker,
+            segmentReader,
+            () -> revisions,
+            getBlobStore(),
+            segmentCache,
+            segmentWriter,
+            stats,
+            new CancelCompactionSupplier(
+                () -> !sufficientDiskSpace.get(),
+                () -> !sufficientMemory.get(),
+                shutDown::isShutDown
+            ),
+            this::flush,
+            generation ->
+                defaultSegmentWriterBuilder("c")
+                    .with(builder.getCacheManager().withAccessTracking("COMPACT", builder.getStatsProvider()))
+                    .withGeneration(generation)
+                    .withoutWriterPool()
+                    .build(this)
+        );
+
         this.snfeListener = builder.getSnfeListener();
 
         fileStoreScheduler.scheduleWithFixedDelay(format("TarMK flush [%s]", directory), 5, SECONDS,
@@ -256,7 +236,7 @@ public class FileStore extends AbstractF
     public Runnable getGCRunner() {
         return new SafeRunnable(format("TarMK revision gc [%s]", directory), () -> {
             try (ShutDownCloser ignored = shutDown.keepAlive()) {
-                garbageCollector.run();
+                garbageCollector.run(garbageCollectionStrategy);
             } catch (IOException e) {
                 log.error("Error running revision garbage collection", e);
             }
@@ -340,7 +320,7 @@ public class FileStore extends AbstractF
      */
     public void fullGC() throws IOException {
         try (ShutDownCloser ignored = shutDown.keepAlive()) {
-            garbageCollector.runFull();
+            garbageCollector.runFull(garbageCollectionStrategy);
         }
     }
 
@@ -349,7 +329,7 @@ public class FileStore extends AbstractF
      */
     public void tailGC() throws IOException {
         try (ShutDownCloser ignored = shutDown.keepAlive()) {
-            garbageCollector.runTail();
+            garbageCollector.runTail(garbageCollectionStrategy);
         }
     }
 
@@ -361,13 +341,19 @@ public class FileStore extends AbstractF
      */
     public boolean compactFull() {
         try (ShutDownCloser ignored = shutDown.keepAlive()) {
-            return garbageCollector.compactFull().isSuccess();
+            return garbageCollector.compactFull(garbageCollectionStrategy).isSuccess();
+        } catch (IOException e) {
+            log.warn("Unable to perform full compaction", e);
+            return false;
         }
     }
 
     public boolean compactTail() {
         try (ShutDownCloser ignored = shutDown.keepAlive()) {
-            return garbageCollector.compactTail().isSuccess();
+            return garbageCollector.compactTail(garbageCollectionStrategy).isSuccess();
+        } catch (IOException e) {
+            log.warn("Unable to perform tail compaction");
+            return false;
         }
     }
 
@@ -380,7 +366,7 @@ public class FileStore extends AbstractF
      */
     public void cleanup() throws IOException {
         try (ShutDownCloser ignored = shutDown.keepAlive()) {
-            fileReaper.add(garbageCollector.cleanup());
+            fileReaper.add(garbageCollector.cleanup(garbageCollectionStrategy));
         }
     }
 
@@ -539,650 +525,4 @@ public class FileStore extends AbstractF
         }
     }
 
-    private class GarbageCollector {
-
-        @Nonnull
-        private final SegmentGCOptions gcOptions;
-
-        /**
-         * {@code GcListener} listening to this instance's gc progress
-         */
-        @Nonnull
-        private final PrefixedGCListener gcListener;
-
-        @Nonnull
-        private final GCJournal gcJournal;
-
-        @Nonnull
-        private final WriterCacheManager cacheManager;
-
-        @Nonnull
-        private GCNodeWriteMonitor compactionMonitor = GCNodeWriteMonitor.EMPTY;
-
-        private volatile boolean cancelled;
-
-        /**
-         * Timestamp of the last time {@link #fullGC()} or {@link #tailGC()} was
-         * successfully invoked. 0 if never.
-         */
-        private long lastSuccessfullGC;
-
-        /**
-         * Last compaction type used to determine which predicate to use during
-         * {@link #cleanup() cleanup}. Defaults to {@link GCType#FULL FULL}, which is
-         * conservative and safe in case it does not match the real type (e.g. because
-         * of a system restart).
-         */
-        @Nonnull
-        private GCType lastCompactionType = FULL;
-
-        GarbageCollector(
-                @Nonnull SegmentGCOptions gcOptions,
-                @Nonnull GCListener gcListener,
-                @Nonnull GCJournal gcJournal,
-                @Nonnull WriterCacheManager cacheManager) {
-            this.gcOptions = gcOptions;
-            this.gcListener = new PrefixedGCListener(gcListener, GC_COUNT);
-            this.gcJournal = gcJournal;
-            this.cacheManager = cacheManager;
-        }
-
-        GCNodeWriteMonitor getGCNodeWriteMonitor() {
-            return compactionMonitor;
-        }
-
-        synchronized void run() throws IOException {
-            switch (gcOptions.getGCType()) {
-                case FULL:
-                    runFull();
-                    break;
-                case TAIL:
-                    runTail();
-                    break;
-                default:
-                    throw new IllegalStateException("Invalid GC type");
-            }
-        }
-
-        synchronized void runFull() throws IOException {
-            run(true, this::compactFull);
-        }
-
-        synchronized void runTail() throws IOException {
-            run(false, this::compactTail);
-        }
-
-        private void run(boolean full, Supplier<CompactionResult> compact) throws IOException {
-            try {
-                GC_COUNT.incrementAndGet();
-
-                gcListener.info("started");
-
-                long dt = System.currentTimeMillis() - lastSuccessfullGC;
-                if (dt < GC_BACKOFF) {
-                    gcListener.skipped("skipping garbage collection as it already ran " +
-                        "less than {} hours ago ({} s).", GC_BACKOFF / 3600000, dt / 1000);
-                    return;
-                }
-
-                boolean sufficientEstimatedGain = true;
-                if (gcOptions.isEstimationDisabled()) {
-                    gcListener.info("estimation skipped because it was explicitly disabled");
-                } else if (gcOptions.isPaused()) {
-                    gcListener.info("estimation skipped because compaction is paused");
-                } else {
-                    gcListener.info("estimation started");
-                    gcListener.updateStatus(ESTIMATION.message());
-
-                    PrintableStopwatch watch = PrintableStopwatch.createStarted();
-                    GCEstimationResult estimation = estimateCompactionGain(full);
-                    sufficientEstimatedGain = estimation.isGcNeeded();
-                    String gcLog = estimation.getGcLog();
-                    if (sufficientEstimatedGain) {
-                        gcListener.info("estimation completed in {}. {}", watch, gcLog);
-                    } else {
-                        gcListener.skipped("estimation completed in {}. {}", watch, gcLog);
-                    }
-                }
-
-                if (sufficientEstimatedGain) {
-                    try (GCMemoryBarrier ignored = new GCMemoryBarrier(sufficientMemory, gcListener, gcOptions)) {
-                        if (gcOptions.isPaused()) {
-                            gcListener.skipped("compaction paused");
-                        } else if (!sufficientMemory.get()) {
-                            gcListener.skipped("compaction skipped. Not enough memory");
-                        } else {
-                            CompactionResult compactionResult = compact.get();
-                            if (compactionResult.isSuccess()) {
-                                lastSuccessfullGC = System.currentTimeMillis();
-                            } else {
-                                gcListener.info("cleaning up after failed compaction");
-                            }
-                            fileReaper.add(cleanup(compactionResult));
-                        }
-                    }
-                }
-            } finally {
-                compactionMonitor.finished();
-                gcListener.updateStatus(IDLE.message());
-            }
-        }
-
-        /**
-         * Estimated compaction gain. The result will be undefined if stopped through
-         * the passed {@code stop} signal.
-         * @return compaction gain estimate
-         */
-        GCEstimationResult estimateCompactionGain(boolean full) {
-            return new SizeDeltaGcEstimation(gcOptions.getGcSizeDeltaEstimation(), gcJournal, tarFiles.size(), full).estimate();
-        }
-
-        @Nonnull
-        private CompactionResult compactionAborted(@Nonnull GCGeneration generation) {
-            gcListener.compactionFailed(generation);
-            return CompactionResult.aborted(getGcGeneration(), generation);
-        }
-
-        @Nonnull
-        private CompactionResult compactionSucceeded(
-                @Nonnull GCType gcType,
-                @Nonnull GCGeneration generation,
-                @Nonnull RecordId compactedRootId) {
-            gcListener.compactionSucceeded(generation);
-            return CompactionResult.succeeded(gcType, generation, gcOptions, compactedRootId);
-        }
-
-        @CheckForNull
-        private SegmentNodeState getBase() {
-            String root = gcJournal.read().getRoot();
-            RecordId rootId = RecordId.fromString(tracker, root);
-            if (RecordId.NULL.equals(rootId)) {
-                return null;
-            }
-            try {
-                SegmentNodeState node = segmentReader.readNode(rootId);
-                node.getPropertyCount();  // Resilience: fail early with a SNFE if the segment is not there
-                return node;
-            } catch (SegmentNotFoundException snfe) {
-                gcListener.error("base state " + rootId + " is not accessible", snfe);
-                return null;
-            }
-        }
-
-        synchronized CompactionResult compactFull() {
-            gcListener.info("running full compaction");
-            return compact(FULL, EMPTY_NODE, getGcGeneration().nextFull());
-        }
-
-        synchronized CompactionResult compactTail() {
-            gcListener.info("running tail compaction");
-            SegmentNodeState base = getBase();
-            if (base != null) {
-                return compact(TAIL, base, getGcGeneration().nextTail());
-            }
-            gcListener.info("no base state available, running full compaction instead");
-            return compact(FULL, EMPTY_NODE, getGcGeneration().nextFull());
-        }
-
-        private CompactionResult compact(
-                @Nonnull GCType gcType,
-                @Nonnull NodeState base,
-                @Nonnull GCGeneration newGeneration) {
-            try {
-                PrintableStopwatch watch = PrintableStopwatch.createStarted();
-                gcListener.info("compaction started, gc options={}, current generation={}, new generation={}",
-                                gcOptions, getHead().getRecordId().getSegment().getGcGeneration(), newGeneration);
-                gcListener.updateStatus(COMPACTION.message());
-
-                GCJournalEntry gcEntry = gcJournal.read();
-                long initialSize = size();
-
-                SegmentWriter writer = defaultSegmentWriterBuilder("c")
-                        .with(cacheManager)
-                        .withGeneration(newGeneration)
-                        .withoutWriterPool()
-                        .build(FileStore.this);
-
-                CancelCompactionSupplier cancel = new CancelCompactionSupplier(FileStore.this);
-
-                compactionMonitor = new GCNodeWriteMonitor(gcOptions.getGcLogInterval(), gcListener);
-                compactionMonitor.init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize);
-
-                CheckpointCompactor compactor = new CheckpointCompactor(gcListener,
-                    segmentReader, writer, getBlobStore(), cancel, compactionMonitor);
-
-                SegmentNodeState head = getHead();
-                SegmentNodeState compacted = compactor.compact(base, head, base);
-                if (compacted == null) {
-                    gcListener.warn("compaction cancelled: {}.", cancel);
-                    return compactionAborted(newGeneration);
-                }
-
-                gcListener.info("compaction cycle 0 completed in {}. Compacted {} to {}",
-                    watch, head.getRecordId(), compacted.getRecordId());
-
-                int cycles = 0;
-                boolean success = false;
-                SegmentNodeState previousHead = head;
-                while (cycles < gcOptions.getRetryCount() &&
-                        !(success = revisions.setHead(previousHead.getRecordId(), compacted.getRecordId(), EXPEDITE_OPTION))) {
-                    // Some other concurrent changes have been made.
-                    // Rebase (and compact) those changes on top of the
-                    // compacted state before retrying to set the head.
-                    cycles++;
-                    gcListener.info("compaction detected concurrent commits while compacting. " +
-                            "Compacting these commits. Cycle {} of {}",
-                        cycles, gcOptions.getRetryCount());
-                    gcListener.updateStatus(COMPACTION_RETRY.message() + cycles);
-                    PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted();
-
-                    head = getHead();
-                    compacted = compactor.compact(previousHead, head, compacted);
-                    if (compacted == null) {
-                        gcListener.warn("compaction cancelled: {}.", cancel);
-                        return compactionAborted(newGeneration);
-                    }
-
-                    gcListener.info("compaction cycle {} completed in {}. Compacted {} against {} to {}",
-                        cycles, cycleWatch, head.getRecordId(), previousHead.getRecordId(), compacted.getRecordId());
-                    previousHead = head;
-                }
-
-                if (!success) {
-                    gcListener.info("compaction gave up compacting concurrent commits after {} cycles.",
-                        cycles);
-                    int forceTimeout = gcOptions.getForceTimeout();
-                    if (forceTimeout > 0) {
-                        gcListener.info("trying to force compact remaining commits for {} seconds. " +
-                                "Concurrent commits to the store will be blocked.",
-                            forceTimeout);
-                        gcListener.updateStatus(COMPACTION_FORCE_COMPACT.message());
-                        PrintableStopwatch forceWatch = PrintableStopwatch.createStarted();
-
-                        cycles++;
-                        cancel.timeOutAfter(forceTimeout, SECONDS);
-                        compacted = forceCompact(previousHead, compacted, compactor);
-                        success = compacted != null;
-                        if (success) {
-                            gcListener.info("compaction succeeded to force compact remaining commits " +
-                                "after {}.", forceWatch);
-                        } else {
-                            if (cancel.get()) {
-                                gcListener.warn("compaction failed to force compact remaining commits " +
-                                        "after {}. Compaction was cancelled: {}.",
-                                    forceWatch, cancel);
-                            } else {
-                                gcListener.warn("compaction failed to force compact remaining commits. " +
-                                        "after {}. Could not acquire exclusive access to the node store.",
-                                    forceWatch);
-                            }
-                        }
-                    }
-                }
-
-                if (success) {
-                    // Update type of the last compaction before calling methods that could throw an exception.
-                    lastCompactionType = gcType;
-                    writer.flush();
-                    flush();
-                    gcListener.info("compaction succeeded in {}, after {} cycles", watch, cycles);
-                    return compactionSucceeded(gcType, newGeneration, compacted.getRecordId());
-                } else {
-                    gcListener.info("compaction failed after {}, and {} cycles", watch, cycles);
-                    return compactionAborted(newGeneration);
-                }
-            } catch (InterruptedException e) {
-                gcListener.error("compaction interrupted", e);
-                currentThread().interrupt();
-                return compactionAborted(newGeneration);
-            } catch (IOException e) {
-                gcListener.error("compaction encountered an error", e);
-                return compactionAborted(newGeneration);
-            }
-        }
-
-        private SegmentNodeState forceCompact(
-                @Nonnull final NodeState base,
-                @Nonnull final NodeState onto,
-                @Nonnull final CheckpointCompactor compactor)
-        throws InterruptedException {
-            RecordId compactedId = revisions.setHead(new Function<RecordId, RecordId>() {
-                @Nullable
-                @Override
-                public RecordId apply(RecordId headId) {
-                    try {
-                        long t0 = currentTimeMillis();
-                        SegmentNodeState after = compactor.compact(
-                            base, segmentReader.readNode(headId), onto);
-                        if (after == null) {
-                            gcListener.info("compaction cancelled after {} seconds",
-                                (currentTimeMillis() - t0) / 1000);
-                            return null;
-                        } else {
-                            return after.getRecordId();
-                        }
-                    } catch (IOException e) {
-                        gcListener.error("error during forced compaction.", e);
-                        return null;
-                    }
-                }
-            }, timeout(gcOptions.getForceTimeout(), SECONDS));
-            return compactedId != null
-                ? segmentReader.readNode(compactedId)
-                : null;
-        }
-
-        private CleanupContext newCleanupContext(Predicate<GCGeneration> old) {
-            return new CleanupContext() {
-
-                private boolean isUnreferencedBulkSegment(UUID id, boolean referenced) {
-                    return !isDataSegmentId(id.getLeastSignificantBits()) && !referenced;
-                }
-
-                private boolean isOldDataSegment(UUID id, GCGeneration generation) {
-                    return isDataSegmentId(id.getLeastSignificantBits()) && old.apply(generation);
-                }
-
-                @Override
-                public Collection<UUID> initialReferences() {
-                    Set<UUID> references = newHashSet();
-                    for (SegmentId id : tracker.getReferencedSegmentIds()) {
-                        if (id.isBulkSegmentId()) {
-                            references.add(id.asUUID());
-                        }
-                    }
-                    return references;
-                }
-
-                @Override
-                public boolean shouldReclaim(UUID id, GCGeneration generation, boolean referenced) {
-                    return isUnreferencedBulkSegment(id, referenced) || isOldDataSegment(id, generation);
-                }
-
-                @Override
-                public boolean shouldFollow(UUID from, UUID to) {
-                    return !isDataSegmentId(to.getLeastSignificantBits());
-                }
-
-            };
-        }
-
-        /**
-         * Cleanup segments whose generation matches the reclaim predicate determined by
-         * the {@link #lastCompactionType last successful compaction}.
-         * @return list of files to be removed
-         * @throws IOException
-         */
-        @Nonnull
-        synchronized List<String> cleanup() throws IOException {
-            return cleanup(CompactionResult.skipped(
-                lastCompactionType,
-                getGcGeneration(),
-                garbageCollector.gcOptions,
-                revisions.getHead()
-            ));
-        }
-
-        /**
-         * Cleanup segments whose generation matches the {@link CompactionResult#reclaimer()} predicate.
-         * @return list of files to be removed
-         * @throws IOException
-         */
-        @Nonnull
-        private List<String> cleanup(@Nonnull CompactionResult compactionResult)
-            throws IOException {
-            PrintableStopwatch watch = PrintableStopwatch.createStarted();
-
-            gcListener.info("cleanup started using reclaimer {}", compactionResult.reclaimer());
-            gcListener.updateStatus(CLEANUP.message());
-            segmentCache.clear();
-
-            // Suggest to the JVM that now would be a good time
-            // to clear stale weak references in the SegmentTracker
-            System.gc();
-
-            TarFiles.CleanupResult cleanupResult = tarFiles.cleanup(newCleanupContext(compactionResult.reclaimer()));
-            if (cleanupResult.isInterrupted()) {
-                gcListener.info("cleanup interrupted");
-            }
-            tracker.clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(), compactionResult.gcInfo());
-            gcListener.info("cleanup marking files for deletion: {}", toFileNames(cleanupResult.getRemovableFiles()));
-
-            long finalSize = size();
-            long reclaimedSize = cleanupResult.getReclaimedSize();
-            stats.reclaimed(reclaimedSize);
-            gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(),
-                    compactionMonitor.getCompactedNodes(),
-                    compactionResult.getCompactedRootId().toString10());
-            gcListener.cleaned(reclaimedSize, finalSize);
-            gcListener.info(
-                "cleanup completed in {}. Post cleanup size is {} and space reclaimed {}.",
-                watch,
-                newPrintableBytes(finalSize),
-                newPrintableBytes(reclaimedSize));
-            return cleanupResult.getRemovableFiles();
-        }
-
-        private String toFileNames(@Nonnull List<String> files) {
-            if (files.isEmpty()) {
-                return "none";
-            } else {
-                return Joiner.on(",").join(files);
-            }
-        }
-
-        /**
-         * Finds all external blob references that are currently accessible
-         * in this repository and adds them to the given collector. Useful
-         * for collecting garbage in an external data store.
-         * <p>
-         * Note that this method only collects blob references that are already
-         * stored in the repository (at the time when this method is called), so
-         * the garbage collector will need some other mechanism for tracking
-         * in-memory references and references stored while this method is
-         * running.
-         * @param collector  reference collector called back for each blob reference found
-         */
-        synchronized void collectBlobReferences(Consumer<String> collector) throws IOException {
-            segmentWriter.flush();
-            tarFiles.collectBlobReferences(collector,
-                    newOldReclaimer(lastCompactionType, getGcGeneration(), gcOptions.getRetainedGenerations()));
-        }
-
-        void cancel() {
-            cancelled = true;
-        }
-
-        /**
-         * Represents the cancellation policy for the compaction phase. If the disk
-         * space was considered insufficient at least once during compaction (or if
-         * the space was never sufficient to begin with), compaction is considered
-         * canceled. Furthermore when the file store is shutting down, compaction is
-         * considered canceled.
-         * Finally the cancellation can be triggered by a timeout that can be set
-         * at any time.
-         */
-        private class CancelCompactionSupplier implements Supplier<Boolean> {
-            private final FileStore store;
-
-            private String reason;
-            private volatile long baseLine;
-            private volatile long deadline;
-
-            public CancelCompactionSupplier(@Nonnull FileStore store) {
-                cancelled = false;
-                this.store = store;
-            }
-
-            /**
-             * Set a timeout for cancellation. Setting a different timeout cancels
-             * a previous one that did not yet elapse. Setting a timeout after
-             * cancellation took place has no effect.
-             */
-            public void timeOutAfter(final long duration, @Nonnull final TimeUnit unit) {
-                baseLine = currentTimeMillis();
-                deadline = baseLine + MILLISECONDS.convert(duration, unit);
-            }
-
-            @Override
-            public Boolean get() {
-                // The outOfDiskSpace and shutdown flags can only transition from
-                // false (their initial values), to true. Once true, there should
-                // be no way to go back.
-                if (!store.sufficientDiskSpace.get()) {
-                    reason = "Not enough disk space";
-                    return true;
-                }
-                if (!store.sufficientMemory.get()) {
-                    reason = "Not enough memory";
-                    return true;
-                }
-                if (store.shutDown.isShutDown()) {
-                    reason = "The FileStore is shutting down";
-                    return true;
-                }
-                if (cancelled) {
-                    reason = "Cancelled by user";
-                    return true;
-                }
-                if (deadline > 0 && currentTimeMillis() > deadline) {
-                    long dt = SECONDS.convert(currentTimeMillis() - baseLine, MILLISECONDS);
-                    reason = "Timeout after " + dt + " seconds";
-                    return true;
-                }
-                return false;
-            }
-
-            @Override
-            public String toString() { return reason; }
-        }
-    }
-
-    /**
-     * Instances of this class represent the result from a compaction. Either
-     * {@link #succeeded(GCType, GCGeneration, SegmentGCOptions, RecordId) succeeded},
-     * {@link #aborted(GCGeneration, GCGeneration) aborted} or {@link
-     * #skipped(GCType, GCGeneration, SegmentGCOptions, RecordId)}  skipped}.
-     */
-    private abstract static class CompactionResult {
-        @Nonnull
-        private final GCGeneration currentGeneration;
-
-        protected CompactionResult(@Nonnull GCGeneration currentGeneration) {
-            this.currentGeneration = currentGeneration;
-        }
-
-        /**
-         * Result of a succeeded compaction.
-         * @param gcType            the type of the succeeded compaction operation
-         * @param newGeneration     the generation successfully created by compaction
-         * @param gcOptions         the current GC options used by compaction
-         * @param compactedRootId   the record id of the root created by compaction
-         */
-        static CompactionResult succeeded(
-                @Nonnull GCType gcType,
-                @Nonnull GCGeneration newGeneration,
-                @Nonnull final SegmentGCOptions gcOptions,
-                @Nonnull final RecordId compactedRootId) {
-            return new CompactionResult(newGeneration) {
-                @Override
-                Predicate<GCGeneration> reclaimer() {
-                    return newOldReclaimer(gcType, newGeneration, gcOptions.getRetainedGenerations());
-                }
-
-                @Override
-                boolean isSuccess() {
-                    return true;
-                }
-
-                @Override
-                RecordId getCompactedRootId() {
-                    return compactedRootId;
-                }
-            };
-        }
-
-        /**
-         * Result of an aborted compaction.
-         * @param currentGeneration  the current generation of the store
-         * @param failedGeneration   the generation that compaction attempted to create
-         */
-        static CompactionResult aborted(
-                @Nonnull GCGeneration currentGeneration,
-                @Nonnull final GCGeneration failedGeneration) {
-            return new CompactionResult(currentGeneration) {
-                @Override
-                Predicate<GCGeneration> reclaimer() {
-                    return Reclaimers.newExactReclaimer(failedGeneration);
-                }
-
-                @Override
-                boolean isSuccess() {
-                    return false;
-                }
-            };
-        }
-
-        /**
-         * Result serving as a placeholder for a compaction that was skipped.
-         * @param lastGCType         type of the most recent gc operation. {@link GCType#FULL} if none.
-         * @param currentGeneration  the current generation of the store
-         * @param gcOptions          the current GC options used by compaction
-         */
-        static CompactionResult skipped(
-                @Nonnull GCType lastGCType,
-                @Nonnull GCGeneration currentGeneration,
-                @Nonnull final SegmentGCOptions gcOptions,
-                @Nonnull final RecordId compactedRootId) {
-            return new CompactionResult(currentGeneration) {
-                @Override
-                Predicate<GCGeneration> reclaimer() {
-                    return Reclaimers.newOldReclaimer(lastGCType, currentGeneration, gcOptions.getRetainedGenerations());
-                }
-
-                @Override
-                boolean isSuccess() {
-                    return true;
-                }
-
-                @Override
-                RecordId getCompactedRootId() {
-                    return compactedRootId;
-                }
-            };
-        }
-
-        /**
-         * @return  a predicate determining which segments to
-         *          {@link GarbageCollector#cleanup(CompactionResult) clean up} for
-         *          the given compaction result.
-         */
-        abstract Predicate<GCGeneration> reclaimer();
-
-        /**
-         * @return  {@code true} for {@link #succeeded(GCType, GCGeneration, SegmentGCOptions, RecordId) succeeded}
-         *          and {@link #skipped(GCType, GCGeneration, SegmentGCOptions, RecordId) skipped}, {@code false} otherwise.
-         */
-        abstract boolean isSuccess();
-
-        /**
-         * @return  the record id of the compacted root on {@link #isSuccess() success},
-         *          {@link RecordId#NULL} otherwise.
-         */
-        RecordId getCompactedRootId() {
-            return RecordId.NULL;
-        }
-
-        /**
-         * @return  a diagnostic message describing the outcome of this compaction.
-         */
-        String gcInfo() {
-            return  "gc-count=" + GC_COUNT +
-                    ",gc-status=" + (isSuccess() ? "success" : "failed") +
-                    ",store-generation=" + currentGeneration +
-                    ",reclaim-predicate=" + reclaimer();
-        }
-
-    }
-
 }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.segment.SegmentCache;
+import org.apache.jackrabbit.oak.segment.SegmentReader;
+import org.apache.jackrabbit.oak.segment.SegmentTracker;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+interface GarbageCollectionStrategy {
+
+    interface SuccessfulGarbageCollectionListener {
+
+        void onSuccessfulGarbageCollection();
+
+    }
+
+    interface SuccessfulCompactionListener {
+
+        void onSuccessfulCompaction(SegmentGCOptions.GCType type);
+
+    }
+
+    interface SegmentWriterFactory {
+
+        SegmentWriter newSegmentWriter(GCGeneration generation);
+
+    }
+
+    interface Context {
+
+        SegmentGCOptions getGCOptions();
+
+        GCListener getGCListener();
+
+        Revisions getRevisions();
+
+        GCJournal getGCJournal();
+
+        SegmentTracker getSegmentTracker();
+
+        SegmentWriterFactory getSegmentWriterFactory();
+
+        GCNodeWriteMonitor getCompactionMonitor();
+
+        BlobStore getBlobStore();
+
+        CancelCompactionSupplier getCanceller();
+
+        long getLastSuccessfulGC();
+
+        TarFiles getTarFiles();
+
+        AtomicBoolean getSufficientMemory();
+
+        FileReaper getFileReaper();
+
+        SuccessfulGarbageCollectionListener getSuccessfulGarbageCollectionListener();
+
+        SuccessfulCompactionListener getSuccessfulCompactionListener();
+
+        Flusher getFlusher();
+
+        long getGCBackOff();
+
+        SegmentGCOptions.GCType getLastCompactionType();
+
+        int getGCCount();
+
+        SegmentCache getSegmentCache();
+
+        FileStoreStats getFileStoreStats();
+
+        SegmentReader getSegmentReader();
+
+    }
+
+    void collectGarbage(Context context) throws IOException;
+
+    void collectFullGarbage(Context context) throws IOException;
+
+    void collectTailGarbage(Context context) throws IOException;
+
+    CompactionResult compactFull(Context context) throws IOException;
+
+    CompactionResult compactTail(Context context) throws IOException;
+
+    List<String> cleanup(Context context) throws IOException;
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native