You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/05/11 14:37:07 UTC

svn commit: r1743381 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/ main/java/org/apache/jackrabbit/oak/segment/file/ test/java/org/apache/jackrabbit/oak/segment/

Author: mduerig
Date: Wed May 11 14:37:07 2016
New Revision: 1743381

URL: http://svn.apache.org/viewvc?rev=1743381&view=rev
Log:
OAK-4280: Compaction cannot be cancelled
Introduce a writeNode method that can be cancelled

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed May 11 14:37:07 2016
@@ -52,8 +52,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
 import javax.jcr.PropertyType;
 
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.io.Closeables;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -261,6 +265,13 @@ public class SegmentWriter {
         return new SegmentPropertyState(id, state.getName(), state.getType());
     }
 
+    /**
+     * Write a node state
+     * @param state node state to write
+     * @return segment node state equal to {@code state}
+     * @throws IOException
+     */
+    @Nonnull
     public SegmentNodeState writeNode(final NodeState state) throws IOException {
         return new SegmentNodeState(
             writeOperationHandler.execute(new SegmentWriteOperation() {
@@ -272,14 +283,56 @@ public class SegmentWriter {
     }
 
     /**
+     * Write a node state, unless cancelled
+     * @param state   node state to write
+     * @param cancel  supplier to signal cancellation of this write operation
+     * @return segment node state equal to {@code state} or {@code null} if cancelled.
+     * @throws IOException
+     */
+    @CheckForNull
+    public SegmentNodeState writeNode(final NodeState state, Supplier<Boolean> cancel)
+    throws IOException {
+        try {
+            return new SegmentNodeState(writeOperationHandler.execute(new SegmentWriteOperation(cancel) {
+                @Override
+                public RecordId execute(SegmentBufferWriter writer) throws IOException {
+                    return with(writer).writeNode(state, 0);
+                }
+            }));
+        } catch (SegmentWriteOperation.CancelledWriteException e) {
+            return null;
+        }
+    }
+
+    /**
      * This {@code WriteOperation} implementation is used internally to provide
      * context to a recursive chain of calls without having pass the context
      * as a separate argument (a poor mans monad). As such it is entirely
      * <em>not thread safe</em>.
      */
     private abstract class SegmentWriteOperation implements WriteOperation {
+
+        /**
+         * This exception is used internally to signal cancellation of a (recursive)
+         * write node operation.
+         */
+        private class CancelledWriteException extends IOException {
+            public CancelledWriteException() {
+                super("Cancelled write operation");
+            }
+        }
+
+        private final Supplier<Boolean> cancel;
         private SegmentBufferWriter writer;
 
+        protected SegmentWriteOperation(Supplier<Boolean> cancel) {
+            this.cancel = cancel;
+        }
+
+        protected SegmentWriteOperation() {
+            this(Suppliers.ofInstance(false));
+        }
+
         @Override
         public abstract RecordId execute(SegmentBufferWriter writer) throws IOException;
 
@@ -783,6 +836,10 @@ public class SegmentWriter {
         }
 
         private RecordId writeNode(NodeState state, int depth) throws IOException {
+            if (cancel.get()) {
+                // Poor man's Either Monad
+                throw new CancelledWriteException();
+            }
             if (state instanceof SegmentNodeState) {
                 SegmentNodeState sns = ((SegmentNodeState) state);
                 if (hasSegment(sns)) {

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed May 11 14:37:07 2016
@@ -581,7 +581,7 @@ public class FileStore implements Segmen
         return headId.getSegment().getGcGen();
     }
 
-    public boolean maybeCompact(boolean cleanup) throws IOException {
+    public void maybeCompact(boolean cleanup) throws IOException {
         gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
 
         Runtime runtime = Runtime.getRuntime();
@@ -601,11 +601,9 @@ public class FileStore implements Segmen
             if (cleanup) {
                 cleanupNeeded.set(!gcOptions.isPaused());
             }
-            return false;
         }
 
         Stopwatch watch = Stopwatch.createStarted();
-        boolean compacted = false;
 
         int gainThreshold = gcOptions.getGainThreshold();
         boolean sufficientEstimatedGain = true;
@@ -620,7 +618,6 @@ public class FileStore implements Segmen
             CompactionGainEstimate estimate = estimateCompactionGain(shutdown);
             if (shutdown.get()) {
                 gcMonitor.info("TarMK GC #{}: estimation interrupted. Skipping compaction.", GC_COUNT);
-                return false;
             }
 
             long gain = estimate.estimateCompactionGain();
@@ -654,12 +651,10 @@ public class FileStore implements Segmen
                 if (compact()) {
                     cleanupNeeded.set(cleanup);
                 }
-                compacted = true;
             } else {
                 gcMonitor.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
             }
         }
-        return compacted;
     }
 
     static Map<Integer, Map<Character, File>> collectFiles(File directory) {
@@ -1070,9 +1065,8 @@ public class FileStore implements Segmen
         // Make the capacity and initial depth of the deduplication cache configurable
         final DeduplicationCache<String> nodeCache = new DeduplicationCache<String>(1000000, 20);
 
-        // FIXME OAK-4280: Compaction cannot be cancelled
         // FIXME OAK-4279: Rework offline compaction
-        // This way of compacting has not progress logging and cannot be cancelled
+        // This way of compacting has no progress logging
         final int gcGeneration = tracker.getGcGen() + 1;
         SegmentWriter writer = new SegmentWriter(this, tracker.getSegmentVersion(),
             new SegmentBufferWriter(this, tracker.getSegmentVersion(), "c", gcGeneration),
@@ -1092,7 +1086,13 @@ public class FileStore implements Segmen
                     GC_COUNT, existing);
         }
 
-        SegmentNodeState after = compact(writer, before);
+        Supplier<Boolean> cancel = newCancelCompactionCondition();
+        SegmentNodeState after = compact(writer, before, cancel);
+        if (after == null) {
+            gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+            return false;
+        }
+
         gcMonitor.info("TarMK GC #{}: compacted {} to {}",
                 GC_COUNT, before.getRecordId(), after.getRecordId());
 
@@ -1107,7 +1107,12 @@ public class FileStore implements Segmen
                 gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
                     "Compacting these commits. Cycle {}", GC_COUNT, cycles);
                 SegmentNodeState head = getHead();
-                after = compact(writer, head);
+                after = compact(writer, head, cancel);
+                if (after == null) {
+                    gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+                    return false;
+                }
+
                 gcMonitor.info("TarMK GC #{}: compacted {} against {} to {}",
                         GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
                 before = head;
@@ -1126,9 +1131,10 @@ public class FileStore implements Segmen
                         GC_COUNT, cycles - 1);
                 if (gcOptions.getForceAfterFail()) {
                     gcMonitor.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
-                    if (!forceCompact(writer)) {
+                    if (!forceCompact(writer, cancel)) {
                         gcMonitor.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
-                            "Most likely compaction didn't get exclusive access to the store. Cleaning up.",
+                            "Most likely compaction didn't get exclusive access to the store or was " +
+                            "prematurely cancelled. Cleaning up.",
                             GC_COUNT);
                         cleanup(new Predicate<Integer>() {
                             @Override
@@ -1154,17 +1160,28 @@ public class FileStore implements Segmen
         }
     }
 
-    private static SegmentNodeState compact(SegmentWriter writer, NodeState node) throws IOException {
-        SegmentNodeState compacted = writer.writeNode(node);
-        writer.flush();
+    private static SegmentNodeState compact(SegmentWriter writer, NodeState node,
+                                            Supplier<Boolean> cancel)
+    throws IOException {
+        SegmentNodeState compacted = writer.writeNode(node, cancel);
+        if (compacted != null) {
+            writer.flush();
+        }
         return compacted;
     }
 
-    private boolean forceCompact(SegmentWriter writer) throws InterruptedException, IOException {
+    private boolean forceCompact(SegmentWriter writer, Supplier<Boolean> cancel)
+    throws InterruptedException, IOException {
         if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) {
             try {
                 SegmentNodeState head = getHead();
-                return setHead(head, compact(writer, head));
+                SegmentNodeState after = compact(writer, head, cancel);
+                if (after == null) {
+                    gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+                    return false;
+                } else {
+                    return setHead(head, after);
+                }
             } finally {
                 rwLock.writeLock().unlock();
             }
@@ -1569,7 +1586,7 @@ public class FileStore implements Segmen
         }
 
         @Override
-        public boolean maybeCompact(boolean cleanup) {
+        public void maybeCompact(boolean cleanup) {
             throw new UnsupportedOperationException("Read Only Store");
         }
     }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Wed May 11 14:37:07 2016
@@ -20,7 +20,9 @@
 package org.apache.jackrabbit.oak.segment;
 
 import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.lang.Integer.getInteger;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
 import static org.apache.jackrabbit.oak.api.Type.STRING;
 import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK;
@@ -182,6 +184,55 @@ public class CompactionAndCleanupIT {
         return nodeStore.createBlob(new ByteArrayInputStream(data));
     }
 
+    @Test
+    public void testCancelCompaction()
+    throws Throwable {
+        final FileStore fileStore = FileStore.builder(getFileStoreFolder())
+                .withGCOptions(DEFAULT.setRetainedGenerations(2))
+                .withMaxFileSize(1)
+                .build();
+        SegmentNodeStore nodeStore = SegmentNodeStore.builder(fileStore).build();
+
+        NodeBuilder builder = nodeStore.getRoot().builder();
+        addNodes(builder, 10);
+        nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        fileStore.flush();
+
+        FutureTask<Boolean> async = runAsync(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws IOException {
+                boolean cancelled = false;
+                for (int k = 0; !cancelled && k < 1000; k++) {
+                    cancelled = !fileStore.compact();
+                }
+                return cancelled;
+            }
+        });
+
+        // Give the compaction thread a head start
+        sleepUninterruptibly(1, SECONDS);
+
+        fileStore.close();
+        try {
+            assertTrue(async.get());
+        } catch (ExecutionException e) {
+            if (!(e.getCause() instanceof IllegalStateException)) {
+                // Throw cause unless this is an ISE thrown by the
+                // store being already closed, which is kinda expected
+                throw e.getCause();
+            }
+        }
+    }
+
+    private static void addNodes(NodeBuilder builder, int depth) {
+        if (depth > 0) {
+            NodeBuilder child1 = builder.setChildNode("1");
+            addNodes(child1, depth - 1);
+            NodeBuilder child2 = builder.setChildNode("2");
+            addNodes(child2, depth - 1);
+        }
+    }
+
     /**
      * Regression test for OAK-2192 testing for mixed segments. This test does not
      * cover OAK-3348. I.e. it does not assert the segment graph is free of cross
@@ -308,6 +359,12 @@ public class CompactionAndCleanupIT {
         return task.get();
     }
 
+    private static <T> FutureTask<T> runAsync(Callable<T> callable) {
+        FutureTask<T> task = new FutureTask<T>(callable);
+        new Thread(task).start();
+        return task;
+    }
+
     /**
      * Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
      */

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java Wed May 11 14:37:07 2016
@@ -46,6 +46,7 @@ import java.util.Map;
 import java.util.Random;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableMap;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -424,4 +425,11 @@ public class RecordTest {
         assertNotNull(state.getProperty("jcr:mixinTypes"));
     }
 
+    @Test
+    public void testCancel() throws IOException {
+        NodeBuilder builder = EMPTY_NODE.builder();
+        NodeState state = writer.writeNode(builder.getNodeState(), Suppliers.ofInstance(true));
+        assertNull(state);
+    }
+
 }