You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2015/10/09 11:33:56 UTC

svn commit: r1707681 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java/org/apache/jackrabbit/oak/plugins/segment/file/

Author: mduerig
Date: Fri Oct  9 09:33:56 2015
New Revision: 1707681

URL: http://svn.apache.org/viewvc?rev=1707681&view=rev
Log:
OAK-3290: Revision gc blocks repository shutdown
Signal repository shutdown to gain estimator and cleanup

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java?rev=1707681&r1=1707680&r2=1707681&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionGainEstimate.java Fri Oct  9 09:33:56 2015
@@ -21,6 +21,7 @@ import static org.apache.jackrabbit.oak.
 import java.io.File;
 import java.util.UUID;
 
+import com.google.common.base.Supplier;
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.PrimitiveSink;
@@ -49,13 +50,22 @@ class CompactionGainEstimate implements
 
     private long reachableSize = 0;
 
-    CompactionGainEstimate(SegmentNodeState node, int estimatedBulkCount) {
+    /**
+     * Create a new instance of gain estimator. The estimation process can be stopped
+     * by switching the supplier {@code stop} to {@code true}, in which case the returned
+     * estimates are undefined.
+     *
+     * @param node  root node state
+     * @param estimatedBulkCount
+     * @param stop  stop signal
+     */
+    CompactionGainEstimate(SegmentNodeState node, int estimatedBulkCount, Supplier<Boolean> stop) {
         uuids = BloomFilter.create(UUID_FUNNEL, estimatedBulkCount);
-        collectReferencedSegments(node, new RecordIdSet());
+        collectReferencedSegments(node, new RecordIdSet(), stop);
     }
 
-    private void collectReferencedSegments(SegmentNodeState node, RecordIdSet visited) {
-        if (visited.addIfNotPresent(node.getRecordId())) {
+    private void collectReferencedSegments(SegmentNodeState node, RecordIdSet visited, Supplier<Boolean> stop) {
+        if (!stop.get() && visited.addIfNotPresent(node.getRecordId())) {
             collectUUID(node.getRecordId().getSegmentId());
             for (PropertyState property : node.getProperties()) {
                 if (property instanceof SegmentPropertyState) {
@@ -75,7 +85,7 @@ class CompactionGainEstimate implements
             }
             for (ChildNodeEntry child : node.getChildNodeEntries()) {
                 collectReferencedSegments((SegmentNodeState) child.getNodeState(),
-                        visited);
+                        visited, stop);
             }
         }
     }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1707681&r1=1707680&r2=1707681&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Fri Oct  9 09:33:56 2015
@@ -16,6 +16,47 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.file;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.newArrayListWithCapacity;
+import static com.google.common.collect.Lists.newLinkedList;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Maps.newLinkedHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum;
+import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import org.apache.jackrabbit.oak.api.Blob;
@@ -41,46 +82,6 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newHashMap;
-import static com.google.common.collect.Maps.newLinkedHashMap;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.lang.String.format;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonMap;
-import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
-import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
-import static org.apache.jackrabbit.oak.plugins.segment.CompactionMap.sum;
-import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.NO_COMPACTION;
-
 /**
  * The storage implementation for tar files.
  */
@@ -194,6 +195,11 @@ public class FileStore implements Segmen
     private final AtomicBoolean sufficientDiskSpace;
 
     /**
+     * Flag signalling shutdown of the file store
+     */
+    private volatile boolean shutdown;
+
+    /**
      * Create a new instance of a {@link Builder} for a file store.
      * @param directory  directory where the tar files are stored
      * @return a new {@link Builder} instance.
@@ -456,7 +462,7 @@ public class FileStore implements Segmen
         }
 
         if (!readonly) {
-            this.flushThread = new BackgroundThread(
+            flushThread = new BackgroundThread(
                     "TarMK flush thread [" + directory + "]", 5000, // 5s interval
                     new Runnable() {
                         @Override
@@ -469,7 +475,7 @@ public class FileStore implements Segmen
                             }
                         }
                     });
-            this.compactionThread = new BackgroundThread(
+            compactionThread = new BackgroundThread(
                     "TarMK compaction thread [" + directory + "]", -1,
                     new Runnable() {
                         @Override
@@ -478,7 +484,8 @@ public class FileStore implements Segmen
                         }
                     });
 
-            diskSpaceThread = new BackgroundThread("TarMK disk space check [" + directory + "]", TimeUnit.MINUTES.toMillis(1), new Runnable() {
+            diskSpaceThread = new BackgroundThread(
+                    "TarMK disk space check [" + directory + "]", MINUTES.toMillis(1), new Runnable() {
 
                 @Override
                 public void run() {
@@ -489,8 +496,8 @@ public class FileStore implements Segmen
 
             approximateSize = new AtomicLong(size());
         } else {
-            this.flushThread = null;
-            this.compactionThread = null;
+            flushThread = null;
+            compactionThread = null;
             diskSpaceThread = null;
             approximateSize = null;
         }
@@ -506,7 +513,7 @@ public class FileStore implements Segmen
     }
 
     public boolean maybeCompact(boolean cleanup) {
-        log.info("TarMK compaction started");
+        gcMonitor.info("TarMK compaction started");
 
         Runtime runtime = Runtime.getRuntime();
         long avail = runtime.totalMemory() - runtime.freeMemory();
@@ -538,7 +545,13 @@ public class FileStore implements Segmen
         byte gainThreshold = compactionStrategy.getGainThreshold();
         boolean runCompaction = true;
         if (gainThreshold > 0) {
-            CompactionGainEstimate estimate = estimateCompactionGain();
+            Supplier<Boolean> shutdown = newShutdownSignal();
+            CompactionGainEstimate estimate = estimateCompactionGain(shutdown);
+            if (shutdown.get()) {
+                gcMonitor.info("Compaction estimation interrupted. Skipping compaction.");
+                return false;
+            }
+
             long gain = estimate.estimateCompactionGain(offset);
             runCompaction = gain >= gainThreshold;
             if (runCompaction) {
@@ -672,12 +685,20 @@ public class FileStore implements Segmen
         return count;
     }
 
-    CompactionGainEstimate estimateCompactionGain() {
-        CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(),
-                count());
+    /**
+     * Estimated compaction gain. The result will be undefined if stopped through
+     * the passed {@code stop} signal.
+     * @param stop  signal for stopping the estimation process.
+     * @return compaction gain estimate
+     */
+    CompactionGainEstimate estimateCompactionGain(Supplier<Boolean> stop) {
+        CompactionGainEstimate estimate = new CompactionGainEstimate(getHead(), count(), stop);
         synchronized (this) {
             for (TarReader reader : readers) {
                 reader.accept(estimate);
+                if (stop.get()) {
+                    break;
+                }
             }
         }
         return estimate;
@@ -763,7 +784,7 @@ public class FileStore implements Segmen
             }
             writer.collectReferences(referencedIds);
             for (TarReader reader : readers) {
-                cleaned.put(reader, null);
+                cleaned.put(reader, reader);
             }
         }
 
@@ -773,8 +794,11 @@ public class FileStore implements Segmen
         LinkedList<File> toRemove = newLinkedList();
         Set<UUID> cleanedIds = newHashSet();
         for (TarReader reader : cleaned.keySet()) {
-            TarReader newReader = reader.cleanup(referencedIds, cm, cleanedIds);
-            cleaned.put(reader, newReader);
+            cleaned.put(reader, reader.cleanup(referencedIds, cm, cleanedIds));
+            if (shutdown) {
+                gcMonitor.info("TarMK revision cleanup interrupted");
+                break;
+            }
         }
 
         List<TarReader> oldReaders = newArrayList();
@@ -832,28 +856,55 @@ public class FileStore implements Segmen
      * space was considered insufficient at least once during compaction (or if
      * the space was never sufficient to begin with), compaction is considered
      * canceled.
+     * Furthermore when the file store is shutting down, compaction is considered
+     * canceled.
      *
      * @return a flag indicating if compaction should be canceled.
      */
     private Supplier<Boolean> newCancelCompactionCondition() {
         return new Supplier<Boolean>() {
 
-            private boolean canceled = false;
+            private boolean outOfDiskSpace;
+            private boolean shutdown;
 
             @Override
             public Boolean get() {
 
-                // The canceled flag can only transition from false (its initial
-                // value), to true. Once compaction is considered canceled,
-                // there should be no way to go back.
-
+                // The outOfDiskSpace and shutdown flags can only transition from false (their initial
+                // values), to true. Once true, there should be no way to go back.
                 if (!sufficientDiskSpace.get()) {
-                    canceled = true;
+                    outOfDiskSpace = true;
+                }
+                if (FileStore.this.shutdown) {
+                    this.shutdown = true;
                 }
 
-                return canceled;
+                return shutdown || outOfDiskSpace;
+            }
+
+            @Override
+            public String toString() {
+                if (outOfDiskSpace) {
+                    return "Not enough disk space available";
+                } else if (shutdown) {
+                    return "FileStore shutdown request received";
+                } else {
+                    return "";
+                }
             }
+        };
+    }
 
+    /**
+     * Returns a signal indication the file store shutting down.
+     * @return  a shutdown signal
+     */
+    private Supplier<Boolean> newShutdownSignal() {
+        return new Supplier<Boolean>() {
+            @Override
+            public Boolean get() {
+                return shutdown;
+            }
         };
     }
 
@@ -881,7 +932,7 @@ public class FileStore implements Segmen
         SegmentNodeState after = compactor.compact(EMPTY_NODE, before, EMPTY_NODE);
 
         if (compactionCanceled.get()) {
-            gcMonitor.warn("TarMK compaction was canceled, not enough disk space available.");
+            gcMonitor.warn("TarMK compaction was canceled: {}", compactionCanceled);
             return;
         }
 
@@ -900,7 +951,7 @@ public class FileStore implements Segmen
                 after = compactor.compact(before, head, after);
 
                 if (compactionCanceled.get()) {
-                    gcMonitor.warn("TarMK compaction was canceled, not enough disk space available.");
+                    gcMonitor.warn("TarMK compaction was canceled: {}", compactionCanceled);
                     return;
                 }
 
@@ -973,6 +1024,9 @@ public class FileStore implements Segmen
 
     @Override
     public void close() {
+        // Flag the store as shutting / shut down
+        shutdown = true;
+
         // avoid deadlocks by closing (and joining) the background
         // threads before acquiring the synchronization lock
         closeAndLogOnFail(compactionThread);

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java?rev=1707681&r1=1707680&r2=1707681&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/CompactionEstimatorTest.java Fri Oct  9 09:33:56 2015
@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Random;
 
+import com.google.common.base.Suppliers;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -87,7 +88,7 @@ public class CompactionEstimatorTest {
         fileStore.flush();
         try {
             // should be at 66%
-            assertTrue(fileStore.estimateCompactionGain()
+            assertTrue(fileStore.estimateCompactionGain(Suppliers.ofInstance(false))
                     .estimateCompactionGain(0) > 60);
         } finally {
             fileStore.close();