You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/05/11 14:37:07 UTC
svn commit: r1743381 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/
main/java/org/apache/jackrabbit/oak/segment/file/
test/java/org/apache/jackrabbit/oak/segment/
Author: mduerig
Date: Wed May 11 14:37:07 2016
New Revision: 1743381
URL: http://svn.apache.org/viewvc?rev=1743381&view=rev
Log:
OAK-4280: Compaction cannot be cancelled
Introduce a writeNode method that can be cancelled
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java Wed May 11 14:37:07 2016
@@ -52,8 +52,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
import javax.jcr.PropertyType;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.io.Closeables;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.PropertyState;
@@ -261,6 +265,13 @@ public class SegmentWriter {
return new SegmentPropertyState(id, state.getName(), state.getType());
}
+ /**
+ * Write a node state
+ * @param state node state to write
+ * @return segment node state equal to {@code state}
+ * @throws IOException
+ */
+ @Nonnull
public SegmentNodeState writeNode(final NodeState state) throws IOException {
return new SegmentNodeState(
writeOperationHandler.execute(new SegmentWriteOperation() {
@@ -272,14 +283,56 @@ public class SegmentWriter {
}
/**
+ * Write a node state, unless cancelled
+ * @param state node state to write
+ * @param cancel supplier to signal cancellation of this write operation
+ * @return segment node state equal to {@code state} or {@code null} if cancelled.
+ * @throws IOException
+ */
+ @CheckForNull
+ public SegmentNodeState writeNode(final NodeState state, Supplier<Boolean> cancel)
+ throws IOException {
+ try {
+ return new SegmentNodeState(writeOperationHandler.execute(new SegmentWriteOperation(cancel) {
+ @Override
+ public RecordId execute(SegmentBufferWriter writer) throws IOException {
+ return with(writer).writeNode(state, 0);
+ }
+ }));
+ } catch (SegmentWriteOperation.CancelledWriteException e) {
+ return null;
+ }
+ }
+
+ /**
* This {@code WriteOperation} implementation is used internally to provide
* context to a recursive chain of calls without having pass the context
* as a separate argument (a poor mans monad). As such it is entirely
* <em>not thread safe</em>.
*/
private abstract class SegmentWriteOperation implements WriteOperation {
+
+ /**
+ * This exception is used internally to signal cancellation of a (recursive)
+ * write node operation.
+ */
+ private class CancelledWriteException extends IOException {
+ public CancelledWriteException() {
+ super("Cancelled write operation");
+ }
+ }
+
+ private final Supplier<Boolean> cancel;
private SegmentBufferWriter writer;
+ protected SegmentWriteOperation(Supplier<Boolean> cancel) {
+ this.cancel = cancel;
+ }
+
+ protected SegmentWriteOperation() {
+ this(Suppliers.ofInstance(false));
+ }
+
@Override
public abstract RecordId execute(SegmentBufferWriter writer) throws IOException;
@@ -783,6 +836,10 @@ public class SegmentWriter {
}
private RecordId writeNode(NodeState state, int depth) throws IOException {
+ if (cancel.get()) {
+ // Poor man's Either Monad
+ throw new CancelledWriteException();
+ }
if (state instanceof SegmentNodeState) {
SegmentNodeState sns = ((SegmentNodeState) state);
if (hasSegment(sns)) {
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java Wed May 11 14:37:07 2016
@@ -581,7 +581,7 @@ public class FileStore implements Segmen
return headId.getSegment().getGcGen();
}
- public boolean maybeCompact(boolean cleanup) throws IOException {
+ public void maybeCompact(boolean cleanup) throws IOException {
gcMonitor.info("TarMK GC #{}: started", GC_COUNT.incrementAndGet());
Runtime runtime = Runtime.getRuntime();
@@ -601,11 +601,9 @@ public class FileStore implements Segmen
if (cleanup) {
cleanupNeeded.set(!gcOptions.isPaused());
}
- return false;
}
Stopwatch watch = Stopwatch.createStarted();
- boolean compacted = false;
int gainThreshold = gcOptions.getGainThreshold();
boolean sufficientEstimatedGain = true;
@@ -620,7 +618,6 @@ public class FileStore implements Segmen
CompactionGainEstimate estimate = estimateCompactionGain(shutdown);
if (shutdown.get()) {
gcMonitor.info("TarMK GC #{}: estimation interrupted. Skipping compaction.", GC_COUNT);
- return false;
}
long gain = estimate.estimateCompactionGain();
@@ -654,12 +651,10 @@ public class FileStore implements Segmen
if (compact()) {
cleanupNeeded.set(cleanup);
}
- compacted = true;
} else {
gcMonitor.skipped("TarMK GC #{}: compaction paused", GC_COUNT);
}
}
- return compacted;
}
static Map<Integer, Map<Character, File>> collectFiles(File directory) {
@@ -1070,9 +1065,8 @@ public class FileStore implements Segmen
// Make the capacity and initial depth of the deduplication cache configurable
final DeduplicationCache<String> nodeCache = new DeduplicationCache<String>(1000000, 20);
- // FIXME OAK-4280: Compaction cannot be cancelled
// FIXME OAK-4279: Rework offline compaction
- // This way of compacting has not progress logging and cannot be cancelled
+ // This way of compacting has no progress logging
final int gcGeneration = tracker.getGcGen() + 1;
SegmentWriter writer = new SegmentWriter(this, tracker.getSegmentVersion(),
new SegmentBufferWriter(this, tracker.getSegmentVersion(), "c", gcGeneration),
@@ -1092,7 +1086,13 @@ public class FileStore implements Segmen
GC_COUNT, existing);
}
- SegmentNodeState after = compact(writer, before);
+ Supplier<Boolean> cancel = newCancelCompactionCondition();
+ SegmentNodeState after = compact(writer, before, cancel);
+ if (after == null) {
+ gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ return false;
+ }
+
gcMonitor.info("TarMK GC #{}: compacted {} to {}",
GC_COUNT, before.getRecordId(), after.getRecordId());
@@ -1107,7 +1107,12 @@ public class FileStore implements Segmen
gcMonitor.info("TarMK GC #{}: compaction detected concurrent commits while compacting. " +
"Compacting these commits. Cycle {}", GC_COUNT, cycles);
SegmentNodeState head = getHead();
- after = compact(writer, head);
+ after = compact(writer, head, cancel);
+ if (after == null) {
+ gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ return false;
+ }
+
gcMonitor.info("TarMK GC #{}: compacted {} against {} to {}",
GC_COUNT, head.getRecordId(), before.getRecordId(), after.getRecordId());
before = head;
@@ -1126,9 +1131,10 @@ public class FileStore implements Segmen
GC_COUNT, cycles - 1);
if (gcOptions.getForceAfterFail()) {
gcMonitor.info("TarMK GC #{}: compaction force compacting remaining commits", GC_COUNT);
- if (!forceCompact(writer)) {
+ if (!forceCompact(writer, cancel)) {
gcMonitor.warn("TarMK GC #{}: compaction failed to force compact remaining commits. " +
- "Most likely compaction didn't get exclusive access to the store. Cleaning up.",
+ "Most likely compaction didn't get exclusive access to the store or was " +
+ "prematurely cancelled. Cleaning up.",
GC_COUNT);
cleanup(new Predicate<Integer>() {
@Override
@@ -1154,17 +1160,28 @@ public class FileStore implements Segmen
}
}
- private static SegmentNodeState compact(SegmentWriter writer, NodeState node) throws IOException {
- SegmentNodeState compacted = writer.writeNode(node);
- writer.flush();
+ private static SegmentNodeState compact(SegmentWriter writer, NodeState node,
+ Supplier<Boolean> cancel)
+ throws IOException {
+ SegmentNodeState compacted = writer.writeNode(node, cancel);
+ if (compacted != null) {
+ writer.flush();
+ }
return compacted;
}
- private boolean forceCompact(SegmentWriter writer) throws InterruptedException, IOException {
+ private boolean forceCompact(SegmentWriter writer, Supplier<Boolean> cancel)
+ throws InterruptedException, IOException {
if (rwLock.writeLock().tryLock(gcOptions.getLockWaitTime(), TimeUnit.SECONDS)) {
try {
SegmentNodeState head = getHead();
- return setHead(head, compact(writer, head));
+ SegmentNodeState after = compact(writer, head, cancel);
+ if (after == null) {
+ gcMonitor.info("TarMK GC #{}: compaction cancelled.", GC_COUNT);
+ return false;
+ } else {
+ return setHead(head, after);
+ }
} finally {
rwLock.writeLock().unlock();
}
@@ -1569,7 +1586,7 @@ public class FileStore implements Segmen
}
@Override
- public boolean maybeCompact(boolean cleanup) {
+ public void maybeCompact(boolean cleanup) {
throw new UnsupportedOperationException("Read Only Store");
}
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java Wed May 11 14:37:07 2016
@@ -20,7 +20,9 @@
package org.apache.jackrabbit.oak.segment;
import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.lang.Integer.getInteger;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.jackrabbit.oak.api.Type.STRING;
import static org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture.SEGMENT_MK;
@@ -182,6 +184,55 @@ public class CompactionAndCleanupIT {
return nodeStore.createBlob(new ByteArrayInputStream(data));
}
+ @Test
+ public void testCancelCompaction()
+ throws Throwable {
+ final FileStore fileStore = FileStore.builder(getFileStoreFolder())
+ .withGCOptions(DEFAULT.setRetainedGenerations(2))
+ .withMaxFileSize(1)
+ .build();
+ SegmentNodeStore nodeStore = SegmentNodeStore.builder(fileStore).build();
+
+ NodeBuilder builder = nodeStore.getRoot().builder();
+ addNodes(builder, 10);
+ nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ fileStore.flush();
+
+ FutureTask<Boolean> async = runAsync(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws IOException {
+ boolean cancelled = false;
+ for (int k = 0; !cancelled && k < 1000; k++) {
+ cancelled = !fileStore.compact();
+ }
+ return cancelled;
+ }
+ });
+
+ // Give the compaction thread a head start
+ sleepUninterruptibly(1, SECONDS);
+
+ fileStore.close();
+ try {
+ assertTrue(async.get());
+ } catch (ExecutionException e) {
+ if (!(e.getCause() instanceof IllegalStateException)) {
+ // Throw cause unless this is an ISE thrown by the
+ // store being already closed, which is kinda expected
+ throw e.getCause();
+ }
+ }
+ }
+
+ private static void addNodes(NodeBuilder builder, int depth) {
+ if (depth > 0) {
+ NodeBuilder child1 = builder.setChildNode("1");
+ addNodes(child1, depth - 1);
+ NodeBuilder child2 = builder.setChildNode("2");
+ addNodes(child2, depth - 1);
+ }
+ }
+
/**
* Regression test for OAK-2192 testing for mixed segments. This test does not
* cover OAK-3348. I.e. it does not assert the segment graph is free of cross
@@ -308,6 +359,12 @@ public class CompactionAndCleanupIT {
return task.get();
}
+ private static <T> FutureTask<T> runAsync(Callable<T> callable) {
+ FutureTask<T> task = new FutureTask<T>(callable);
+ new Thread(task).start();
+ return task;
+ }
+
/**
* Test asserting OAK-3348: Cross gc sessions might introduce references to pre-compacted segments
*/
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java?rev=1743381&r1=1743380&r2=1743381&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/RecordTest.java Wed May 11 14:37:07 2016
@@ -46,6 +46,7 @@ import java.util.Map;
import java.util.Random;
import com.google.common.base.Charsets;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.api.CommitFailedException;
@@ -424,4 +425,11 @@ public class RecordTest {
assertNotNull(state.getProperty("jcr:mixinTypes"));
}
+ @Test
+ public void testCancel() throws IOException {
+ NodeBuilder builder = EMPTY_NODE.builder();
+ NodeState state = writer.writeNode(builder.getNodeState(), Suppliers.ofInstance(true));
+ assertNull(state);
+ }
+
}