You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2018/04/23 11:47:24 UTC
svn commit: r1829859 [1/2] -
/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/
Author: frm
Date: Mon Apr 23 11:47:24 2018
New Revision: 1829859
URL: http://svn.apache.org/viewvc?rev=1829859&view=rev
Log:
OAK-7377 - Allow multiple GC implementations in FileStore
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java (with props)
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollector.java (with props)
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/PrefixedGCListener.java
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Represents the cancellation policy for the compaction phase. If the disk
+ * space was considered insufficient at least once during compaction (or if the
+ * space was never sufficient to begin with), compaction is considered canceled.
+ * Furthermore when the file store is shutting down, compaction is considered
+ * canceled. Finally the cancellation can be triggered by a timeout that can be
+ * set at any time.
+ */
+class CancelCompactionSupplier implements Supplier<Boolean> {
+
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ private final BooleanSupplier diskSpaceExhausted;
+
+ private final BooleanSupplier memoryExhausted;
+
+ private final BooleanSupplier shutDown;
+
+ private String reason;
+
+ private volatile long baseLine;
+
+ private volatile long deadline;
+
+ CancelCompactionSupplier(
+ BooleanSupplier diskSpaceExhausted,
+ BooleanSupplier memoryExhausted,
+ BooleanSupplier shutDown
+ ) {
+ this.diskSpaceExhausted = diskSpaceExhausted;
+ this.memoryExhausted = memoryExhausted;
+ this.shutDown = shutDown;
+ }
+
+ /**
+ * Set a timeout for cancellation. Setting a different timeout cancels a
+ * previous one that did not yet elapse. Setting a timeout after
+ * cancellation took place has no effect.
+ */
+ public void timeOutAfter(final long duration, @Nonnull final TimeUnit unit) {
+ baseLine = currentTimeMillis();
+ deadline = baseLine + MILLISECONDS.convert(duration, unit);
+ }
+
+ @Override
+ public Boolean get() {
+ // The outOfDiskSpace and shutdown flags can only transition from
+ // false (their initial values), to true. Once true, there should
+ // be no way to go back.
+ if (diskSpaceExhausted.getAsBoolean()) {
+ reason = "Not enough disk space";
+ return true;
+ }
+ if (memoryExhausted.getAsBoolean()) {
+ reason = "Not enough memory";
+ return true;
+ }
+ if (shutDown.getAsBoolean()) {
+ reason = "The FileStore is shutting down";
+ return true;
+ }
+ if (cancelled.get()) {
+ reason = "Cancelled by user";
+ return true;
+ }
+ if (deadline > 0 && currentTimeMillis() > deadline) {
+ long dt = SECONDS.convert(currentTimeMillis() - baseLine, MILLISECONDS);
+ reason = "Timeout after " + dt + " seconds";
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return reason;
+ }
+
+ public void cancel() {
+ cancelled.set(true);
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CancelCompactionSupplier.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static org.apache.jackrabbit.oak.segment.file.Reclaimers.newOldReclaimer;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Predicate;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+
+/**
+ * Instances of this class represent the result from a compaction. Either
+ * succeeded, aborted or skipped.
+ */
+abstract class CompactionResult {
+
+ @Nonnull
+ private final GCGeneration currentGeneration;
+
+ private final int gcCount;
+
+ private CompactionResult(@Nonnull GCGeneration currentGeneration, int gcCount) {
+ this.currentGeneration = currentGeneration;
+ this.gcCount = gcCount;
+ }
+
+ /**
+ * Result of a succeeded compaction.
+ *
+ * @param gcType the type of the succeeded compaction operation
+ * @param newGeneration the generation successfully created by compaction
+ * @param gcOptions the current GC options used by compaction
+ * @param compactedRootId the record id of the root created by compaction
+ */
+ static CompactionResult succeeded(
+ @Nonnull SegmentGCOptions.GCType gcType,
+ @Nonnull GCGeneration newGeneration,
+ @Nonnull final SegmentGCOptions gcOptions,
+ @Nonnull final RecordId compactedRootId,
+ int gcCount
+ ) {
+ return new CompactionResult(newGeneration, gcCount) {
+
+ @Override
+ Predicate<GCGeneration> reclaimer() {
+ return newOldReclaimer(gcType, newGeneration, gcOptions.getRetainedGenerations());
+ }
+
+ @Override
+ boolean isSuccess() {
+ return true;
+ }
+
+ @Override
+ RecordId getCompactedRootId() {
+ return compactedRootId;
+ }
+ };
+ }
+
+ /**
+ * Result of an aborted compaction.
+ *
+ * @param currentGeneration the current generation of the store
+ * @param failedGeneration the generation that compaction attempted to
+ * create
+ */
+ static CompactionResult aborted(
+ @Nonnull GCGeneration currentGeneration,
+ @Nonnull final GCGeneration failedGeneration,
+ int gcCount
+ ) {
+ return new CompactionResult(currentGeneration, gcCount) {
+
+ @Override
+ Predicate<GCGeneration> reclaimer() {
+ return Reclaimers.newExactReclaimer(failedGeneration);
+ }
+
+ @Override
+ boolean isSuccess() {
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Result serving as a placeholder for a compaction that was skipped.
+ *
+ * @param lastGCType type of the most recent gc operation. {@link
+ * SegmentGCOptions.GCType#FULL} if none.
+ * @param currentGeneration the current generation of the store
+ * @param gcOptions the current GC options used by compaction
+ */
+ static CompactionResult skipped(
+ @Nonnull SegmentGCOptions.GCType lastGCType,
+ @Nonnull GCGeneration currentGeneration,
+ @Nonnull final SegmentGCOptions gcOptions,
+ @Nonnull final RecordId compactedRootId,
+ int gcCount
+ ) {
+ return new CompactionResult(currentGeneration, gcCount) {
+
+ @Override
+ Predicate<GCGeneration> reclaimer() {
+ return Reclaimers.newOldReclaimer(lastGCType, currentGeneration, gcOptions.getRetainedGenerations());
+ }
+
+ @Override
+ boolean isSuccess() {
+ return true;
+ }
+
+ @Override
+ RecordId getCompactedRootId() {
+ return compactedRootId;
+ }
+ };
+ }
+
+ /**
+ * @return a predicate determining which segments to {@link
+ * FileStore.GarbageCollector#cleanup(CompactionResult) clean up} for the
+ * given compaction result.
+ */
+ abstract Predicate<GCGeneration> reclaimer();
+
+ /**
+ * @return {@code true} for succeeded and skipped, {@code false} otherwise.
+ */
+ abstract boolean isSuccess();
+
+ /**
+ * @return the record id of the compacted root on {@link #isSuccess()
+ * success}, {@link RecordId#NULL} otherwise.
+ */
+ RecordId getCompactedRootId() {
+ return RecordId.NULL;
+ }
+
+ /**
+ * @return a diagnostic message describing the outcome of this compaction.
+ */
+ String gcInfo() {
+ return String.format(
+ "gc-count=%d,gc-status=%s,store-generation=%s,reclaim-predicate=%s",
+ gcCount,
+ isSuccess() ? "success" : "failed",
+ currentGeneration,
+ reclaimer()
+ );
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/CompactionResult.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.FULL;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.TAIL;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.CLEANUP;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_FORCE_COMPACT;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_RETRY;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.ESTIMATION;
+import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.IDLE;
+import static org.apache.jackrabbit.oak.segment.file.PrintableBytes.newPrintableBytes;
+import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION;
+import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.annotation.Nonnull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
+import org.apache.jackrabbit.oak.segment.RecordId;
+import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.SegmentNodeState;
+import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
+import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+class DefaultGarbageCollectionStrategy implements GarbageCollectionStrategy {
+
+ private GCGeneration getGcGeneration(Context context) {
+ return context.getRevisions().getHead().getSegmentId().getGcGeneration();
+ }
+
+ private SegmentNodeState getBase(Context context) {
+ String root = context.getGCJournal().read().getRoot();
+ RecordId rootId = RecordId.fromString(context.getSegmentTracker(), root);
+ if (RecordId.NULL.equals(rootId)) {
+ return null;
+ }
+ try {
+ SegmentNodeState node = context.getSegmentReader().readNode(rootId);
+ node.getPropertyCount(); // Resilience: fail early with a SNFE if the segment is not there
+ return node;
+ } catch (SegmentNotFoundException snfe) {
+ context.getGCListener().error("base state " + rootId + " is not accessible", snfe);
+ return null;
+ }
+ }
+
+ private SegmentNodeState getHead(Context context) {
+ return context.getSegmentReader().readHeadState(context.getRevisions());
+ }
+
+ @Override
+ public synchronized void collectGarbage(Context context) throws IOException {
+ switch (context.getGCOptions().getGCType()) {
+ case FULL:
+ collectFullGarbage(context);
+ break;
+ case TAIL:
+ collectTailGarbage(context);
+ break;
+ default:
+ throw new IllegalStateException("Invalid GC type");
+ }
+ }
+
+ @Override
+ public synchronized void collectFullGarbage(Context context) throws IOException {
+ run(context, true, this::compactFull);
+ }
+
+ @Override
+ public synchronized void collectTailGarbage(Context context) throws IOException {
+ run(context, false, this::compactTail);
+ }
+
+ private interface Compactor {
+
+ CompactionResult compact(Context contex) throws IOException;
+
+ }
+
+ private void run(Context context, boolean full, Compactor compact) throws IOException {
+ try {
+ context.getGCListener().info("started");
+
+ long dt = System.currentTimeMillis() - context.getLastSuccessfulGC();
+
+ if (dt < context.getGCBackOff()) {
+ context.getGCListener().skipped("skipping garbage collection as it already ran less than {} hours ago ({} s).", context.getGCBackOff() / 3600000, dt / 1000);
+ return;
+ }
+
+ boolean sufficientEstimatedGain = true;
+ if (context.getGCOptions().isEstimationDisabled()) {
+ context.getGCListener().info("estimation skipped because it was explicitly disabled");
+ } else if (context.getGCOptions().isPaused()) {
+ context.getGCListener().info("estimation skipped because compaction is paused");
+ } else {
+ context.getGCListener().info("estimation started");
+ context.getGCListener().updateStatus(ESTIMATION.message());
+
+ PrintableStopwatch watch = PrintableStopwatch.createStarted();
+ GCEstimationResult estimation = estimateCompactionGain(context, full);
+ sufficientEstimatedGain = estimation.isGcNeeded();
+ String gcLog = estimation.getGcLog();
+ if (sufficientEstimatedGain) {
+ context.getGCListener().info("estimation completed in {}. {}", watch, gcLog);
+ } else {
+ context.getGCListener().skipped("estimation completed in {}. {}", watch, gcLog);
+ }
+ }
+
+ if (sufficientEstimatedGain) {
+ try (GCMemoryBarrier ignored = new GCMemoryBarrier(context.getSufficientMemory(), context.getGCListener(), context.getGCOptions())) {
+ if (context.getGCOptions().isPaused()) {
+ context.getGCListener().skipped("compaction paused");
+ } else if (!context.getSufficientMemory().get()) {
+ context.getGCListener().skipped("compaction skipped. Not enough memory");
+ } else {
+ CompactionResult compactionResult = compact.compact(context);
+ if (compactionResult.isSuccess()) {
+ context.getSuccessfulGarbageCollectionListener().onSuccessfulGarbageCollection();
+ } else {
+ context.getGCListener().info("cleaning up after failed compaction");
+ }
+ context.getFileReaper().add(cleanup(context, compactionResult));
+ }
+ }
+ }
+ } finally {
+ context.getCompactionMonitor().finished();
+ context.getGCListener().updateStatus(IDLE.message());
+ }
+ }
+
+ @Override
+ public synchronized CompactionResult compactFull(Context context) {
+ context.getGCListener().info("running full compaction");
+ return compact(context, FULL, EMPTY_NODE, getGcGeneration(context).nextFull());
+ }
+
+ @Override
+ public synchronized CompactionResult compactTail(Context context) {
+ context.getGCListener().info("running tail compaction");
+ SegmentNodeState base = getBase(context);
+ if (base != null) {
+ return compact(context, TAIL, base, getGcGeneration(context).nextTail());
+ }
+ context.getGCListener().info("no base state available, running full compaction instead");
+ return compact(context, FULL, EMPTY_NODE, getGcGeneration(context).nextFull());
+ }
+
+ private CompactionResult compact(Context context, SegmentGCOptions.GCType gcType, NodeState base, GCGeneration newGeneration) {
+ try {
+ PrintableStopwatch watch = PrintableStopwatch.createStarted();
+ context.getGCListener().info(
+ "compaction started, gc options={}, current generation={}, new generation={}",
+ context.getGCOptions(),
+ getHead(context).getRecordId().getSegment().getGcGeneration(),
+ newGeneration
+ );
+ context.getGCListener().updateStatus(COMPACTION.message());
+
+ GCJournal.GCJournalEntry gcEntry = context.getGCJournal().read();
+ long initialSize = size(context);
+
+ SegmentWriter writer = context.getSegmentWriterFactory().newSegmentWriter(newGeneration);
+
+ context.getCompactionMonitor().init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize);
+
+ CheckpointCompactor compactor = new CheckpointCompactor(
+ context.getGCListener(),
+ context.getSegmentReader(),
+ writer,
+ context.getBlobStore(),
+ context.getCanceller(),
+ context.getCompactionMonitor()
+ );
+
+ SegmentNodeState head = getHead(context);
+ SegmentNodeState compacted = compactor.compact(base, head, base);
+ if (compacted == null) {
+ context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller());
+ return compactionAborted(context, newGeneration);
+ }
+
+ context.getGCListener().info("compaction cycle 0 completed in {}. Compacted {} to {}",
+ watch, head.getRecordId(), compacted.getRecordId());
+
+ int cycles = 0;
+ boolean success = false;
+ SegmentNodeState previousHead = head;
+ while (cycles < context.getGCOptions().getRetryCount() &&
+ !(success = context.getRevisions().setHead(previousHead.getRecordId(), compacted.getRecordId(), EXPEDITE_OPTION))) {
+ // Some other concurrent changes have been made.
+ // Rebase (and compact) those changes on top of the
+ // compacted state before retrying to set the head.
+ cycles++;
+ context.getGCListener().info("compaction detected concurrent commits while compacting. " +
+ "Compacting these commits. Cycle {} of {}",
+ cycles, context.getGCOptions().getRetryCount());
+ context.getGCListener().updateStatus(COMPACTION_RETRY.message() + cycles);
+ PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted();
+
+ head = getHead(context);
+ compacted = compactor.compact(previousHead, head, compacted);
+ if (compacted == null) {
+ context.getGCListener().warn("compaction cancelled: {}.", context.getCanceller());
+ return compactionAborted(context, newGeneration);
+ }
+
+ context.getGCListener().info("compaction cycle {} completed in {}. Compacted {} against {} to {}",
+ cycles, cycleWatch, head.getRecordId(), previousHead.getRecordId(), compacted.getRecordId());
+ previousHead = head;
+ }
+
+ if (!success) {
+ context.getGCListener().info("compaction gave up compacting concurrent commits after {} cycles.", cycles);
+ int forceTimeout = context.getGCOptions().getForceTimeout();
+ if (forceTimeout > 0) {
+ context.getGCListener().info("trying to force compact remaining commits for {} seconds. " +
+ "Concurrent commits to the store will be blocked.",
+ forceTimeout);
+ context.getGCListener().updateStatus(COMPACTION_FORCE_COMPACT.message());
+ PrintableStopwatch forceWatch = PrintableStopwatch.createStarted();
+
+ cycles++;
+ context.getCanceller().timeOutAfter(forceTimeout, SECONDS);
+ compacted = forceCompact(context, previousHead, compacted, compactor);
+ success = compacted != null;
+ if (success) {
+ context.getGCListener().info("compaction succeeded to force compact remaining commits after {}.", forceWatch);
+ } else {
+ if (context.getCanceller().get()) {
+ context.getGCListener().warn("compaction failed to force compact remaining commits " +
+ "after {}. Compaction was cancelled: {}.",
+ forceWatch, context.getCanceller());
+ } else {
+ context.getGCListener().warn("compaction failed to force compact remaining commits. " +
+ "after {}. Could not acquire exclusive access to the node store.",
+ forceWatch);
+ }
+ }
+ }
+ }
+
+ if (success) {
+ // Update type of the last compaction before calling methods that could throw an exception.
+ context.getSuccessfulCompactionListener().onSuccessfulCompaction(gcType);
+ writer.flush();
+ context.getFlusher().flush();
+ context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles);
+ return compactionSucceeded(context, gcType, newGeneration, compacted.getRecordId());
+ } else {
+ context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles);
+ return compactionAborted(context, newGeneration);
+ }
+ } catch (InterruptedException e) {
+ context.getGCListener().error("compaction interrupted", e);
+ currentThread().interrupt();
+ return compactionAborted(context, newGeneration);
+ } catch (IOException e) {
+ context.getGCListener().error("compaction encountered an error", e);
+ return compactionAborted(context, newGeneration);
+ }
+ }
+
+ private CompactionResult compactionAborted(Context context, GCGeneration generation) {
+ context.getGCListener().compactionFailed(generation);
+ return CompactionResult.aborted(getGcGeneration(context), generation, context.getGCCount());
+ }
+
+ private CompactionResult compactionSucceeded(
+ Context context,
+ SegmentGCOptions.GCType gcType,
+ GCGeneration generation,
+ RecordId compactedRootId
+ ) {
+ context.getGCListener().compactionSucceeded(generation);
+ return CompactionResult.succeeded(gcType, generation, context.getGCOptions(), compactedRootId, context.getGCCount());
+ }
+
+ private SegmentNodeState forceCompact(
+ Context context,
+ final NodeState base,
+ final NodeState onto,
+ final CheckpointCompactor compactor
+ ) throws InterruptedException {
+ RecordId compactedId = setHead(context, headId -> {
+ try {
+ PrintableStopwatch t = PrintableStopwatch.createStarted();
+ SegmentNodeState after = compactor.compact(base, context.getSegmentReader().readNode(headId), onto);
+ if (after != null) {
+ return after.getRecordId();
+ }
+ context.getGCListener().info("compaction cancelled after {}", t);
+ return null;
+ } catch (IOException e) {
+ context.getGCListener().error("error during forced compaction.", e);
+ return null;
+ }
+ });
+ if (compactedId == null) {
+ return null;
+ }
+ return context.getSegmentReader().readNode(compactedId);
+ }
+
+ private RecordId setHead(Context context, Function<RecordId, RecordId> f) throws InterruptedException {
+ return context.getRevisions().setHead(f, timeout(context.getGCOptions().getForceTimeout(), SECONDS));
+ }
+
+ private GCEstimationResult estimateCompactionGain(Context context, boolean full) {
+ return new SizeDeltaGcEstimation(
+ context.getGCOptions().getGcSizeDeltaEstimation(),
+ context.getGCJournal(),
+ context.getTarFiles().size(),
+ full
+ ).estimate();
+ }
+
+ private long size(Context context) {
+ return context.getTarFiles().size();
+ }
+
+ @Override
+ public synchronized List<String> cleanup(Context context) throws IOException {
+ return cleanup(context, CompactionResult.skipped(
+ context.getLastCompactionType(),
+ getGcGeneration(context),
+ context.getGCOptions(),
+ context.getRevisions().getHead(),
+ context.getGCCount()
+ ));
+ }
+
+ private List<String> cleanup(Context context, CompactionResult compactionResult)
+ throws IOException {
+ PrintableStopwatch watch = PrintableStopwatch.createStarted();
+
+ context.getGCListener().info("cleanup started using reclaimer {}", compactionResult.reclaimer());
+ context.getGCListener().updateStatus(CLEANUP.message());
+ context.getSegmentCache().clear();
+
+ // Suggest to the JVM that now would be a good time
+ // to clear stale weak references in the SegmentTracker
+ System.gc();
+
+ TarFiles.CleanupResult cleanupResult = context.getTarFiles().cleanup(newCleanupContext(context, compactionResult.reclaimer()));
+ if (cleanupResult.isInterrupted()) {
+ context.getGCListener().info("cleanup interrupted");
+ }
+ context.getSegmentTracker().clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(), compactionResult.gcInfo());
+ context.getGCListener().info("cleanup marking files for deletion: {}", toFileNames(cleanupResult.getRemovableFiles()));
+
+ long finalSize = size(context);
+ long reclaimedSize = cleanupResult.getReclaimedSize();
+ context.getFileStoreStats().reclaimed(reclaimedSize);
+ context.getGCJournal().persist(
+ reclaimedSize,
+ finalSize,
+ getGcGeneration(context),
+ context.getCompactionMonitor().getCompactedNodes(),
+ compactionResult.getCompactedRootId().toString10()
+ );
+ context.getGCListener().cleaned(reclaimedSize, finalSize);
+ context.getGCListener().info(
+ "cleanup completed in {}. Post cleanup size is {} and space reclaimed {}.",
+ watch,
+ newPrintableBytes(finalSize),
+ newPrintableBytes(reclaimedSize)
+ );
+ return cleanupResult.getRemovableFiles();
+ }
+
+ private CleanupContext newCleanupContext(Context context, Predicate<GCGeneration> old) {
+ return new CleanupContext() {
+
+ private boolean isUnreferencedBulkSegment(UUID id, boolean referenced) {
+ return !isDataSegmentId(id.getLeastSignificantBits()) && !referenced;
+ }
+
+ private boolean isOldDataSegment(UUID id, GCGeneration generation) {
+ return isDataSegmentId(id.getLeastSignificantBits()) && old.apply(generation);
+ }
+
+ @Override
+ public Collection<UUID> initialReferences() {
+ Set<UUID> references = newHashSet();
+ for (SegmentId id : context.getSegmentTracker().getReferencedSegmentIds()) {
+ if (id.isBulkSegmentId()) {
+ references.add(id.asUUID());
+ }
+ }
+ return references;
+ }
+
+ @Override
+ public boolean shouldReclaim(UUID id, GCGeneration generation, boolean referenced) {
+ return isUnreferencedBulkSegment(id, referenced) || isOldDataSegment(id, generation);
+ }
+
+ @Override
+ public boolean shouldFollow(UUID from, UUID to) {
+ return !isDataSegmentId(to.getLeastSignificantBits());
+ }
+
+ };
+ }
+
+ private String toFileNames(@Nonnull List<String> files) {
+ if (files.isEmpty()) {
+ return "none";
+ } else {
+ return Joiner.on(",").join(files);
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/DefaultGarbageCollectionStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1829859&r1=1829858&r2=1829859&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Mon Apr 23 11:47:24 2018
@@ -18,73 +18,41 @@
*/
package org.apache.jackrabbit.oak.segment.file;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.lang.Integer.getInteger;
import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
-import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder;
-import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.FULL;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType.TAIL;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.CLEANUP;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_FORCE_COMPACT;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.COMPACTION_RETRY;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.ESTIMATION;
-import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCStatus.IDLE;
import static org.apache.jackrabbit.oak.segment.file.PrintableBytes.newPrintableBytes;
-import static org.apache.jackrabbit.oak.segment.file.Reclaimers.newOldReclaimer;
-import static org.apache.jackrabbit.oak.segment.file.TarRevisions.EXPEDITE_OPTION;
-import static org.apache.jackrabbit.oak.segment.file.TarRevisions.timeout;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.UncheckedExecutionException;
-import org.apache.jackrabbit.oak.segment.CheckpointCompactor;
import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.Segment;
import org.apache.jackrabbit.oak.segment.SegmentId;
import org.apache.jackrabbit.oak.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
import org.apache.jackrabbit.oak.segment.SegmentWriter;
-import org.apache.jackrabbit.oak.segment.WriterCacheManager;
import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
-import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.GCType;
-import org.apache.jackrabbit.oak.segment.file.GCJournal.GCJournalEntry;
import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
-import org.apache.jackrabbit.oak.segment.file.tar.CleanupContext;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,20 +63,8 @@ public class FileStore extends AbstractF
private static final Logger log = LoggerFactory.getLogger(FileStore.class);
- /**
- * Minimal interval in milli seconds between subsequent garbage collection cycles.
- * Garbage collection invoked via {@link #fullGC()} will be skipped unless at least
- * the specified time has passed since its last successful invocation.
- */
- private static final long GC_BACKOFF = getInteger("oak.gc.backoff", 10*3600*1000);
-
private static final int MB = 1024 * 1024;
- /**
- * GC counter for logging purposes
- */
- private static final AtomicLong GC_COUNT = new AtomicLong(0);
-
@Nonnull
private final SegmentWriter segmentWriter;
@@ -152,6 +108,8 @@ public class FileStore extends AbstractF
@Nonnull
private final SegmentNotFoundExceptionListener snfeListener;
+ private final GarbageCollectionStrategy garbageCollectionStrategy = new DefaultGarbageCollectionStrategy();
+
FileStore(final FileStoreBuilder builder) throws InvalidFileStoreVersionException, IOException {
super(builder);
@@ -164,12 +122,6 @@ public class FileStore extends AbstractF
.with(builder.getCacheManager()
.withAccessTracking("WRITE", builder.getStatsProvider()))
.build(this);
- this.garbageCollector = new GarbageCollector(
- builder.getGcOptions(),
- builder.getGcListener(),
- new GCJournal(persistence.getGCJournalFile()),
- builder.getCacheManager()
- .withAccessTracking("COMPACT", builder.getStatsProvider()));
newManifestChecker(persistence, builder.getStrictVersionCheck()).checkAndUpdateManifest();
@@ -190,6 +142,34 @@ public class FileStore extends AbstractF
this.fileReaper = this.tarFiles.createFileReaper();
+ this.garbageCollector = new GarbageCollector(
+ builder.getGcOptions(),
+ builder.getGcListener(),
+ new GCJournal(persistence.getGCJournalFile()),
+ sufficientMemory,
+ fileReaper,
+ tarFiles,
+ tracker,
+ segmentReader,
+ () -> revisions,
+ getBlobStore(),
+ segmentCache,
+ segmentWriter,
+ stats,
+ new CancelCompactionSupplier(
+ () -> !sufficientDiskSpace.get(),
+ () -> !sufficientMemory.get(),
+ shutDown::isShutDown
+ ),
+ this::flush,
+ generation ->
+ defaultSegmentWriterBuilder("c")
+ .with(builder.getCacheManager().withAccessTracking("COMPACT", builder.getStatsProvider()))
+ .withGeneration(generation)
+ .withoutWriterPool()
+ .build(this)
+ );
+
this.snfeListener = builder.getSnfeListener();
fileStoreScheduler.scheduleWithFixedDelay(format("TarMK flush [%s]", directory), 5, SECONDS,
@@ -256,7 +236,7 @@ public class FileStore extends AbstractF
public Runnable getGCRunner() {
return new SafeRunnable(format("TarMK revision gc [%s]", directory), () -> {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
- garbageCollector.run();
+ garbageCollector.run(garbageCollectionStrategy);
} catch (IOException e) {
log.error("Error running revision garbage collection", e);
}
@@ -340,7 +320,7 @@ public class FileStore extends AbstractF
*/
public void fullGC() throws IOException {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
- garbageCollector.runFull();
+ garbageCollector.runFull(garbageCollectionStrategy);
}
}
@@ -349,7 +329,7 @@ public class FileStore extends AbstractF
*/
public void tailGC() throws IOException {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
- garbageCollector.runTail();
+ garbageCollector.runTail(garbageCollectionStrategy);
}
}
@@ -361,13 +341,19 @@ public class FileStore extends AbstractF
*/
public boolean compactFull() {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
- return garbageCollector.compactFull().isSuccess();
+ return garbageCollector.compactFull(garbageCollectionStrategy).isSuccess();
+ } catch (IOException e) {
+ log.warn("Unable to perform full compaction", e);
+ return false;
}
}
public boolean compactTail() {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
- return garbageCollector.compactTail().isSuccess();
+ return garbageCollector.compactTail(garbageCollectionStrategy).isSuccess();
+ } catch (IOException e) {
+ log.warn("Unable to perform tail compaction");
+ return false;
}
}
@@ -380,7 +366,7 @@ public class FileStore extends AbstractF
*/
public void cleanup() throws IOException {
try (ShutDownCloser ignored = shutDown.keepAlive()) {
- fileReaper.add(garbageCollector.cleanup());
+ fileReaper.add(garbageCollector.cleanup(garbageCollectionStrategy));
}
}
@@ -539,650 +525,4 @@ public class FileStore extends AbstractF
}
}
- private class GarbageCollector {
-
- @Nonnull
- private final SegmentGCOptions gcOptions;
-
- /**
- * {@code GcListener} listening to this instance's gc progress
- */
- @Nonnull
- private final PrefixedGCListener gcListener;
-
- @Nonnull
- private final GCJournal gcJournal;
-
- @Nonnull
- private final WriterCacheManager cacheManager;
-
- @Nonnull
- private GCNodeWriteMonitor compactionMonitor = GCNodeWriteMonitor.EMPTY;
-
- private volatile boolean cancelled;
-
- /**
- * Timestamp of the last time {@link #fullGC()} or {@link #tailGC()} was
- * successfully invoked. 0 if never.
- */
- private long lastSuccessfullGC;
-
- /**
- * Last compaction type used to determine which predicate to use during
- * {@link #cleanup() cleanup}. Defaults to {@link GCType#FULL FULL}, which is
- * conservative and safe in case it does not match the real type (e.g. because
- * of a system restart).
- */
- @Nonnull
- private GCType lastCompactionType = FULL;
-
- GarbageCollector(
- @Nonnull SegmentGCOptions gcOptions,
- @Nonnull GCListener gcListener,
- @Nonnull GCJournal gcJournal,
- @Nonnull WriterCacheManager cacheManager) {
- this.gcOptions = gcOptions;
- this.gcListener = new PrefixedGCListener(gcListener, GC_COUNT);
- this.gcJournal = gcJournal;
- this.cacheManager = cacheManager;
- }
-
- GCNodeWriteMonitor getGCNodeWriteMonitor() {
- return compactionMonitor;
- }
-
- synchronized void run() throws IOException {
- switch (gcOptions.getGCType()) {
- case FULL:
- runFull();
- break;
- case TAIL:
- runTail();
- break;
- default:
- throw new IllegalStateException("Invalid GC type");
- }
- }
-
- synchronized void runFull() throws IOException {
- run(true, this::compactFull);
- }
-
- synchronized void runTail() throws IOException {
- run(false, this::compactTail);
- }
-
- private void run(boolean full, Supplier<CompactionResult> compact) throws IOException {
- try {
- GC_COUNT.incrementAndGet();
-
- gcListener.info("started");
-
- long dt = System.currentTimeMillis() - lastSuccessfullGC;
- if (dt < GC_BACKOFF) {
- gcListener.skipped("skipping garbage collection as it already ran " +
- "less than {} hours ago ({} s).", GC_BACKOFF / 3600000, dt / 1000);
- return;
- }
-
- boolean sufficientEstimatedGain = true;
- if (gcOptions.isEstimationDisabled()) {
- gcListener.info("estimation skipped because it was explicitly disabled");
- } else if (gcOptions.isPaused()) {
- gcListener.info("estimation skipped because compaction is paused");
- } else {
- gcListener.info("estimation started");
- gcListener.updateStatus(ESTIMATION.message());
-
- PrintableStopwatch watch = PrintableStopwatch.createStarted();
- GCEstimationResult estimation = estimateCompactionGain(full);
- sufficientEstimatedGain = estimation.isGcNeeded();
- String gcLog = estimation.getGcLog();
- if (sufficientEstimatedGain) {
- gcListener.info("estimation completed in {}. {}", watch, gcLog);
- } else {
- gcListener.skipped("estimation completed in {}. {}", watch, gcLog);
- }
- }
-
- if (sufficientEstimatedGain) {
- try (GCMemoryBarrier ignored = new GCMemoryBarrier(sufficientMemory, gcListener, gcOptions)) {
- if (gcOptions.isPaused()) {
- gcListener.skipped("compaction paused");
- } else if (!sufficientMemory.get()) {
- gcListener.skipped("compaction skipped. Not enough memory");
- } else {
- CompactionResult compactionResult = compact.get();
- if (compactionResult.isSuccess()) {
- lastSuccessfullGC = System.currentTimeMillis();
- } else {
- gcListener.info("cleaning up after failed compaction");
- }
- fileReaper.add(cleanup(compactionResult));
- }
- }
- }
- } finally {
- compactionMonitor.finished();
- gcListener.updateStatus(IDLE.message());
- }
- }
-
- /**
- * Estimated compaction gain. The result will be undefined if stopped through
- * the passed {@code stop} signal.
- * @return compaction gain estimate
- */
- GCEstimationResult estimateCompactionGain(boolean full) {
- return new SizeDeltaGcEstimation(gcOptions.getGcSizeDeltaEstimation(), gcJournal, tarFiles.size(), full).estimate();
- }
-
- @Nonnull
- private CompactionResult compactionAborted(@Nonnull GCGeneration generation) {
- gcListener.compactionFailed(generation);
- return CompactionResult.aborted(getGcGeneration(), generation);
- }
-
- @Nonnull
- private CompactionResult compactionSucceeded(
- @Nonnull GCType gcType,
- @Nonnull GCGeneration generation,
- @Nonnull RecordId compactedRootId) {
- gcListener.compactionSucceeded(generation);
- return CompactionResult.succeeded(gcType, generation, gcOptions, compactedRootId);
- }
-
- @CheckForNull
- private SegmentNodeState getBase() {
- String root = gcJournal.read().getRoot();
- RecordId rootId = RecordId.fromString(tracker, root);
- if (RecordId.NULL.equals(rootId)) {
- return null;
- }
- try {
- SegmentNodeState node = segmentReader.readNode(rootId);
- node.getPropertyCount(); // Resilience: fail early with a SNFE if the segment is not there
- return node;
- } catch (SegmentNotFoundException snfe) {
- gcListener.error("base state " + rootId + " is not accessible", snfe);
- return null;
- }
- }
-
- synchronized CompactionResult compactFull() {
- gcListener.info("running full compaction");
- return compact(FULL, EMPTY_NODE, getGcGeneration().nextFull());
- }
-
- synchronized CompactionResult compactTail() {
- gcListener.info("running tail compaction");
- SegmentNodeState base = getBase();
- if (base != null) {
- return compact(TAIL, base, getGcGeneration().nextTail());
- }
- gcListener.info("no base state available, running full compaction instead");
- return compact(FULL, EMPTY_NODE, getGcGeneration().nextFull());
- }
-
- private CompactionResult compact(
- @Nonnull GCType gcType,
- @Nonnull NodeState base,
- @Nonnull GCGeneration newGeneration) {
- try {
- PrintableStopwatch watch = PrintableStopwatch.createStarted();
- gcListener.info("compaction started, gc options={}, current generation={}, new generation={}",
- gcOptions, getHead().getRecordId().getSegment().getGcGeneration(), newGeneration);
- gcListener.updateStatus(COMPACTION.message());
-
- GCJournalEntry gcEntry = gcJournal.read();
- long initialSize = size();
-
- SegmentWriter writer = defaultSegmentWriterBuilder("c")
- .with(cacheManager)
- .withGeneration(newGeneration)
- .withoutWriterPool()
- .build(FileStore.this);
-
- CancelCompactionSupplier cancel = new CancelCompactionSupplier(FileStore.this);
-
- compactionMonitor = new GCNodeWriteMonitor(gcOptions.getGcLogInterval(), gcListener);
- compactionMonitor.init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize);
-
- CheckpointCompactor compactor = new CheckpointCompactor(gcListener,
- segmentReader, writer, getBlobStore(), cancel, compactionMonitor);
-
- SegmentNodeState head = getHead();
- SegmentNodeState compacted = compactor.compact(base, head, base);
- if (compacted == null) {
- gcListener.warn("compaction cancelled: {}.", cancel);
- return compactionAborted(newGeneration);
- }
-
- gcListener.info("compaction cycle 0 completed in {}. Compacted {} to {}",
- watch, head.getRecordId(), compacted.getRecordId());
-
- int cycles = 0;
- boolean success = false;
- SegmentNodeState previousHead = head;
- while (cycles < gcOptions.getRetryCount() &&
- !(success = revisions.setHead(previousHead.getRecordId(), compacted.getRecordId(), EXPEDITE_OPTION))) {
- // Some other concurrent changes have been made.
- // Rebase (and compact) those changes on top of the
- // compacted state before retrying to set the head.
- cycles++;
- gcListener.info("compaction detected concurrent commits while compacting. " +
- "Compacting these commits. Cycle {} of {}",
- cycles, gcOptions.getRetryCount());
- gcListener.updateStatus(COMPACTION_RETRY.message() + cycles);
- PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted();
-
- head = getHead();
- compacted = compactor.compact(previousHead, head, compacted);
- if (compacted == null) {
- gcListener.warn("compaction cancelled: {}.", cancel);
- return compactionAborted(newGeneration);
- }
-
- gcListener.info("compaction cycle {} completed in {}. Compacted {} against {} to {}",
- cycles, cycleWatch, head.getRecordId(), previousHead.getRecordId(), compacted.getRecordId());
- previousHead = head;
- }
-
- if (!success) {
- gcListener.info("compaction gave up compacting concurrent commits after {} cycles.",
- cycles);
- int forceTimeout = gcOptions.getForceTimeout();
- if (forceTimeout > 0) {
- gcListener.info("trying to force compact remaining commits for {} seconds. " +
- "Concurrent commits to the store will be blocked.",
- forceTimeout);
- gcListener.updateStatus(COMPACTION_FORCE_COMPACT.message());
- PrintableStopwatch forceWatch = PrintableStopwatch.createStarted();
-
- cycles++;
- cancel.timeOutAfter(forceTimeout, SECONDS);
- compacted = forceCompact(previousHead, compacted, compactor);
- success = compacted != null;
- if (success) {
- gcListener.info("compaction succeeded to force compact remaining commits " +
- "after {}.", forceWatch);
- } else {
- if (cancel.get()) {
- gcListener.warn("compaction failed to force compact remaining commits " +
- "after {}. Compaction was cancelled: {}.",
- forceWatch, cancel);
- } else {
- gcListener.warn("compaction failed to force compact remaining commits. " +
- "after {}. Could not acquire exclusive access to the node store.",
- forceWatch);
- }
- }
- }
- }
-
- if (success) {
- // Update type of the last compaction before calling methods that could throw an exception.
- lastCompactionType = gcType;
- writer.flush();
- flush();
- gcListener.info("compaction succeeded in {}, after {} cycles", watch, cycles);
- return compactionSucceeded(gcType, newGeneration, compacted.getRecordId());
- } else {
- gcListener.info("compaction failed after {}, and {} cycles", watch, cycles);
- return compactionAborted(newGeneration);
- }
- } catch (InterruptedException e) {
- gcListener.error("compaction interrupted", e);
- currentThread().interrupt();
- return compactionAborted(newGeneration);
- } catch (IOException e) {
- gcListener.error("compaction encountered an error", e);
- return compactionAborted(newGeneration);
- }
- }
-
- private SegmentNodeState forceCompact(
- @Nonnull final NodeState base,
- @Nonnull final NodeState onto,
- @Nonnull final CheckpointCompactor compactor)
- throws InterruptedException {
- RecordId compactedId = revisions.setHead(new Function<RecordId, RecordId>() {
- @Nullable
- @Override
- public RecordId apply(RecordId headId) {
- try {
- long t0 = currentTimeMillis();
- SegmentNodeState after = compactor.compact(
- base, segmentReader.readNode(headId), onto);
- if (after == null) {
- gcListener.info("compaction cancelled after {} seconds",
- (currentTimeMillis() - t0) / 1000);
- return null;
- } else {
- return after.getRecordId();
- }
- } catch (IOException e) {
- gcListener.error("error during forced compaction.", e);
- return null;
- }
- }
- }, timeout(gcOptions.getForceTimeout(), SECONDS));
- return compactedId != null
- ? segmentReader.readNode(compactedId)
- : null;
- }
-
- private CleanupContext newCleanupContext(Predicate<GCGeneration> old) {
- return new CleanupContext() {
-
- private boolean isUnreferencedBulkSegment(UUID id, boolean referenced) {
- return !isDataSegmentId(id.getLeastSignificantBits()) && !referenced;
- }
-
- private boolean isOldDataSegment(UUID id, GCGeneration generation) {
- return isDataSegmentId(id.getLeastSignificantBits()) && old.apply(generation);
- }
-
- @Override
- public Collection<UUID> initialReferences() {
- Set<UUID> references = newHashSet();
- for (SegmentId id : tracker.getReferencedSegmentIds()) {
- if (id.isBulkSegmentId()) {
- references.add(id.asUUID());
- }
- }
- return references;
- }
-
- @Override
- public boolean shouldReclaim(UUID id, GCGeneration generation, boolean referenced) {
- return isUnreferencedBulkSegment(id, referenced) || isOldDataSegment(id, generation);
- }
-
- @Override
- public boolean shouldFollow(UUID from, UUID to) {
- return !isDataSegmentId(to.getLeastSignificantBits());
- }
-
- };
- }
-
- /**
- * Cleanup segments whose generation matches the reclaim predicate determined by
- * the {@link #lastCompactionType last successful compaction}.
- * @return list of files to be removed
- * @throws IOException
- */
- @Nonnull
- synchronized List<String> cleanup() throws IOException {
- return cleanup(CompactionResult.skipped(
- lastCompactionType,
- getGcGeneration(),
- garbageCollector.gcOptions,
- revisions.getHead()
- ));
- }
-
- /**
- * Cleanup segments whose generation matches the {@link CompactionResult#reclaimer()} predicate.
- * @return list of files to be removed
- * @throws IOException
- */
- @Nonnull
- private List<String> cleanup(@Nonnull CompactionResult compactionResult)
- throws IOException {
- PrintableStopwatch watch = PrintableStopwatch.createStarted();
-
- gcListener.info("cleanup started using reclaimer {}", compactionResult.reclaimer());
- gcListener.updateStatus(CLEANUP.message());
- segmentCache.clear();
-
- // Suggest to the JVM that now would be a good time
- // to clear stale weak references in the SegmentTracker
- System.gc();
-
- TarFiles.CleanupResult cleanupResult = tarFiles.cleanup(newCleanupContext(compactionResult.reclaimer()));
- if (cleanupResult.isInterrupted()) {
- gcListener.info("cleanup interrupted");
- }
- tracker.clearSegmentIdTables(cleanupResult.getReclaimedSegmentIds(), compactionResult.gcInfo());
- gcListener.info("cleanup marking files for deletion: {}", toFileNames(cleanupResult.getRemovableFiles()));
-
- long finalSize = size();
- long reclaimedSize = cleanupResult.getReclaimedSize();
- stats.reclaimed(reclaimedSize);
- gcJournal.persist(reclaimedSize, finalSize, getGcGeneration(),
- compactionMonitor.getCompactedNodes(),
- compactionResult.getCompactedRootId().toString10());
- gcListener.cleaned(reclaimedSize, finalSize);
- gcListener.info(
- "cleanup completed in {}. Post cleanup size is {} and space reclaimed {}.",
- watch,
- newPrintableBytes(finalSize),
- newPrintableBytes(reclaimedSize));
- return cleanupResult.getRemovableFiles();
- }
-
- private String toFileNames(@Nonnull List<String> files) {
- if (files.isEmpty()) {
- return "none";
- } else {
- return Joiner.on(",").join(files);
- }
- }
-
- /**
- * Finds all external blob references that are currently accessible
- * in this repository and adds them to the given collector. Useful
- * for collecting garbage in an external data store.
- * <p>
- * Note that this method only collects blob references that are already
- * stored in the repository (at the time when this method is called), so
- * the garbage collector will need some other mechanism for tracking
- * in-memory references and references stored while this method is
- * running.
- * @param collector reference collector called back for each blob reference found
- */
- synchronized void collectBlobReferences(Consumer<String> collector) throws IOException {
- segmentWriter.flush();
- tarFiles.collectBlobReferences(collector,
- newOldReclaimer(lastCompactionType, getGcGeneration(), gcOptions.getRetainedGenerations()));
- }
-
- void cancel() {
- cancelled = true;
- }
-
- /**
- * Represents the cancellation policy for the compaction phase. If the disk
- * space was considered insufficient at least once during compaction (or if
- * the space was never sufficient to begin with), compaction is considered
- * canceled. Furthermore when the file store is shutting down, compaction is
- * considered canceled.
- * Finally the cancellation can be triggered by a timeout that can be set
- * at any time.
- */
- private class CancelCompactionSupplier implements Supplier<Boolean> {
- private final FileStore store;
-
- private String reason;
- private volatile long baseLine;
- private volatile long deadline;
-
- public CancelCompactionSupplier(@Nonnull FileStore store) {
- cancelled = false;
- this.store = store;
- }
-
- /**
- * Set a timeout for cancellation. Setting a different timeout cancels
- * a previous one that did not yet elapse. Setting a timeout after
- * cancellation took place has no effect.
- */
- public void timeOutAfter(final long duration, @Nonnull final TimeUnit unit) {
- baseLine = currentTimeMillis();
- deadline = baseLine + MILLISECONDS.convert(duration, unit);
- }
-
- @Override
- public Boolean get() {
- // The outOfDiskSpace and shutdown flags can only transition from
- // false (their initial values), to true. Once true, there should
- // be no way to go back.
- if (!store.sufficientDiskSpace.get()) {
- reason = "Not enough disk space";
- return true;
- }
- if (!store.sufficientMemory.get()) {
- reason = "Not enough memory";
- return true;
- }
- if (store.shutDown.isShutDown()) {
- reason = "The FileStore is shutting down";
- return true;
- }
- if (cancelled) {
- reason = "Cancelled by user";
- return true;
- }
- if (deadline > 0 && currentTimeMillis() > deadline) {
- long dt = SECONDS.convert(currentTimeMillis() - baseLine, MILLISECONDS);
- reason = "Timeout after " + dt + " seconds";
- return true;
- }
- return false;
- }
-
- @Override
- public String toString() { return reason; }
- }
- }
-
- /**
- * Instances of this class represent the result from a compaction. Either
- * {@link #succeeded(GCType, GCGeneration, SegmentGCOptions, RecordId) succeeded},
- * {@link #aborted(GCGeneration, GCGeneration) aborted} or {@link
- * #skipped(GCType, GCGeneration, SegmentGCOptions, RecordId)} skipped}.
- */
- private abstract static class CompactionResult {
- @Nonnull
- private final GCGeneration currentGeneration;
-
- protected CompactionResult(@Nonnull GCGeneration currentGeneration) {
- this.currentGeneration = currentGeneration;
- }
-
- /**
- * Result of a succeeded compaction.
- * @param gcType the type of the succeeded compaction operation
- * @param newGeneration the generation successfully created by compaction
- * @param gcOptions the current GC options used by compaction
- * @param compactedRootId the record id of the root created by compaction
- */
- static CompactionResult succeeded(
- @Nonnull GCType gcType,
- @Nonnull GCGeneration newGeneration,
- @Nonnull final SegmentGCOptions gcOptions,
- @Nonnull final RecordId compactedRootId) {
- return new CompactionResult(newGeneration) {
- @Override
- Predicate<GCGeneration> reclaimer() {
- return newOldReclaimer(gcType, newGeneration, gcOptions.getRetainedGenerations());
- }
-
- @Override
- boolean isSuccess() {
- return true;
- }
-
- @Override
- RecordId getCompactedRootId() {
- return compactedRootId;
- }
- };
- }
-
- /**
- * Result of an aborted compaction.
- * @param currentGeneration the current generation of the store
- * @param failedGeneration the generation that compaction attempted to create
- */
- static CompactionResult aborted(
- @Nonnull GCGeneration currentGeneration,
- @Nonnull final GCGeneration failedGeneration) {
- return new CompactionResult(currentGeneration) {
- @Override
- Predicate<GCGeneration> reclaimer() {
- return Reclaimers.newExactReclaimer(failedGeneration);
- }
-
- @Override
- boolean isSuccess() {
- return false;
- }
- };
- }
-
- /**
- * Result serving as a placeholder for a compaction that was skipped.
- * @param lastGCType type of the most recent gc operation. {@link GCType#FULL} if none.
- * @param currentGeneration the current generation of the store
- * @param gcOptions the current GC options used by compaction
- */
- static CompactionResult skipped(
- @Nonnull GCType lastGCType,
- @Nonnull GCGeneration currentGeneration,
- @Nonnull final SegmentGCOptions gcOptions,
- @Nonnull final RecordId compactedRootId) {
- return new CompactionResult(currentGeneration) {
- @Override
- Predicate<GCGeneration> reclaimer() {
- return Reclaimers.newOldReclaimer(lastGCType, currentGeneration, gcOptions.getRetainedGenerations());
- }
-
- @Override
- boolean isSuccess() {
- return true;
- }
-
- @Override
- RecordId getCompactedRootId() {
- return compactedRootId;
- }
- };
- }
-
- /**
- * @return a predicate determining which segments to
- * {@link GarbageCollector#cleanup(CompactionResult) clean up} for
- * the given compaction result.
- */
- abstract Predicate<GCGeneration> reclaimer();
-
- /**
- * @return {@code true} for {@link #succeeded(GCType, GCGeneration, SegmentGCOptions, RecordId) succeeded}
- * and {@link #skipped(GCType, GCGeneration, SegmentGCOptions, RecordId) skipped}, {@code false} otherwise.
- */
- abstract boolean isSuccess();
-
- /**
- * @return the record id of the compacted root on {@link #isSuccess() success},
- * {@link RecordId#NULL} otherwise.
- */
- RecordId getCompactedRootId() {
- return RecordId.NULL;
- }
-
- /**
- * @return a diagnostic message describing the outcome of this compaction.
- */
- String gcInfo() {
- return "gc-count=" + GC_COUNT +
- ",gc-status=" + (isSuccess() ? "success" : "failed") +
- ",store-generation=" + currentGeneration +
- ",reclaim-predicate=" + reclaimer();
- }
-
- }
-
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java?rev=1829859&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java Mon Apr 23 11:47:24 2018
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.segment.Revisions;
+import org.apache.jackrabbit.oak.segment.SegmentCache;
+import org.apache.jackrabbit.oak.segment.SegmentReader;
+import org.apache.jackrabbit.oak.segment.SegmentTracker;
+import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
+import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+
+interface GarbageCollectionStrategy {
+
+ interface SuccessfulGarbageCollectionListener {
+
+ void onSuccessfulGarbageCollection();
+
+ }
+
+ interface SuccessfulCompactionListener {
+
+ void onSuccessfulCompaction(SegmentGCOptions.GCType type);
+
+ }
+
+ interface SegmentWriterFactory {
+
+ SegmentWriter newSegmentWriter(GCGeneration generation);
+
+ }
+
+ interface Context {
+
+ SegmentGCOptions getGCOptions();
+
+ GCListener getGCListener();
+
+ Revisions getRevisions();
+
+ GCJournal getGCJournal();
+
+ SegmentTracker getSegmentTracker();
+
+ SegmentWriterFactory getSegmentWriterFactory();
+
+ GCNodeWriteMonitor getCompactionMonitor();
+
+ BlobStore getBlobStore();
+
+ CancelCompactionSupplier getCanceller();
+
+ long getLastSuccessfulGC();
+
+ TarFiles getTarFiles();
+
+ AtomicBoolean getSufficientMemory();
+
+ FileReaper getFileReaper();
+
+ SuccessfulGarbageCollectionListener getSuccessfulGarbageCollectionListener();
+
+ SuccessfulCompactionListener getSuccessfulCompactionListener();
+
+ Flusher getFlusher();
+
+ long getGCBackOff();
+
+ SegmentGCOptions.GCType getLastCompactionType();
+
+ int getGCCount();
+
+ SegmentCache getSegmentCache();
+
+ FileStoreStats getFileStoreStats();
+
+ SegmentReader getSegmentReader();
+
+ }
+
+ void collectGarbage(Context context) throws IOException;
+
+ void collectFullGarbage(Context context) throws IOException;
+
+ void collectTailGarbage(Context context) throws IOException;
+
+ CompactionResult compactFull(Context context) throws IOException;
+
+ CompactionResult compactTail(Context context) throws IOException;
+
+ List<String> cleanup(Context context) throws IOException;
+
+}
Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/GarbageCollectionStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native