You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/09/28 07:42:54 UTC
svn commit: r1762612 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Author: mreutegg
Date: Wed Sep 28 07:42:53 2016
New Revision: 1762612
URL: http://svn.apache.org/viewvc?rev=1762612&view=rev
Log:
OAK-4826: Auto removal of orphaned checkpoints
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1762612&r1=1762611&r2=1762612&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Wed Sep 28 07:42:53 2016
@@ -30,6 +30,7 @@ import static org.apache.jackrabbit.oak.
import java.io.Closeable;
import java.util.Calendar;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -175,6 +176,28 @@ public class AsyncIndexUpdate implements
private boolean closed;
+ /**
+ * The checkpoint cleanup interval in minutes. Defaults to 5 minutes.
+ * Setting it to a negative value disables automatic cleanup. See OAK-4826.
+ */
+ private final int cleanupIntervalMinutes = getCleanupIntervalMinutes();
+
+ private static int getCleanupIntervalMinutes() {
+ int value = 5;
+ try {
+ value = Integer.parseInt(System.getProperty(
+ "oak.async.checkpointCleanupIntervalMinutes", String.valueOf(value)));
+ } catch (NumberFormatException e) {
+ // use default
+ }
+ return value;
+ }
+
+ /**
+ * The time in minutes since the epoch when the last checkpoint cleanup ran.
+ */
+ private long lastCheckpointCleanUpTime;
+
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@Nonnull IndexEditorProvider provider, boolean switchOnSync) {
this.name = checkNotNull(name);
@@ -457,6 +480,7 @@ public class AsyncIndexUpdate implements
boolean threadNameChanged = false;
String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
"creator", AsyncIndexUpdate.class.getSimpleName(),
+ "created", afterTime,
"thread", oldThreadName,
"name", name));
NodeState after = store.retrieve(afterCheckpoint);
@@ -509,12 +533,65 @@ public class AsyncIndexUpdate implements
checkpointToRelease);
}
}
+ maybeCleanUpCheckpoints();
if (updatePostRunStatus) {
postAsyncRunStatsStatus(indexStats);
}
}
}
+
+ private void maybeCleanUpCheckpoints() {
+ // clean up every five minutes
+ long currentMinutes = TimeUnit.MILLISECONDS.toMinutes(
+ System.currentTimeMillis());
+ if (!indexStats.isFailing()
+ && cleanupIntervalMinutes > -1
+ && lastCheckpointCleanUpTime + cleanupIntervalMinutes < currentMinutes) {
+ try {
+ cleanUpCheckpoints();
+ } catch (Throwable e) {
+ log.warn("Checkpoint clean up failed", e);
+ }
+ lastCheckpointCleanUpTime = currentMinutes;
+ }
+ }
+
+ void cleanUpCheckpoints() {
+ log.debug("Cleaning up orphaned checkpoints");
+ Set<String> keep = newHashSet();
+ String cp = indexStats.getReferenceCheckpoint();
+ if (cp == null) {
+ log.warn("No reference checkpoint set in index stats");
+ return;
+ }
+ keep.add(cp);
+ keep.addAll(indexStats.tempCps);
+ Map<String, String> info = store.checkpointInfo(cp);
+ String value = info.get("created");
+ if (value != null) {
+ // remove unreferenced AsyncIndexUpdate checkpoints:
+ // - without 'created' info (checkpoint created before OAK-4826)
+ // or
+ // - 'created' value older than the current reference
+ long current = ISO8601.parse(value).getTimeInMillis();
+ for (String checkpoint : store.checkpoints()) {
+ info = store.checkpointInfo(checkpoint);
+ String creator = info.get("creator");
+ String created = info.get("created");
+ String name = info.get("name");
+ if (!keep.contains(checkpoint)
+ && this.name.equals(name)
+ && AsyncIndexUpdate.class.getSimpleName().equals(creator)
+ && (created == null || ISO8601.parse(created).getTimeInMillis() < current)) {
+ if (store.release(checkpoint)) {
+ log.info("Removed orphaned checkpoint '{}' {}",
+ checkpoint, info);
+ }
+ }
+ }
+ }
+ }
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
String name, long leaseTimeOut, String beforeCheckpoint,
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1762612&r1=1762611&r2=1762612&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Wed Sep 28 07:42:53 2016
@@ -73,6 +73,7 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.Test;
import ch.qos.logback.classic.Level;
@@ -678,6 +679,77 @@ public class AsyncIndexUpdateTest {
}
}
+ // OAK-4826
+ @Test
+ public void cpCleanupOrphaned() throws Exception {
+ Clock clock = Clock.SIMPLE;
+ MemoryNodeStore store = new MemoryNodeStore();
+ // prepare index and initial content
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ assertTrue("Expecting no checkpoints",
+ store.listCheckpoints().size() == 0);
+
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+ async.run();
+ assertTrue("Expecting one checkpoint",
+ store.listCheckpoints().size() == 1);
+ String cp = store.listCheckpoints().iterator().next();
+ Map<String, String> info = store.checkpointInfo(cp);
+
+ builder = store.getRoot().builder();
+ builder.child("testRoot").setProperty("foo", "def");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ // wait until currentTimeMillis() changes. this ensures
+ // the created value for the checkpoint is different
+ // from the previous checkpoint.
+ clock.waitUntil(clock.getTime() + 1);
+ async.run();
+ assertTrue("Expecting one checkpoint",
+ store.listCheckpoints().size() == 1);
+ cp = store.listCheckpoints().iterator().next();
+
+ // create a new checkpoint with the info from the first checkpoint
+ // this simulates an orphaned checkpoint that should be cleaned up
+ assertNotNull(store.checkpoint(TimeUnit.HOURS.toMillis(1), info));
+ assertTrue("Expecting two checkpoints",
+ store.listCheckpoints().size() == 2);
+
+ async.cleanUpCheckpoints();
+ assertTrue("Expecting one checkpoint",
+ store.listCheckpoints().size() == 1);
+ assertEquals(cp, store.listCheckpoints().iterator().next());
+ }
+
+ @Test
+ public void disableCheckpointCleanup() throws Exception {
+ String propertyName = "oak.async.checkpointCleanupIntervalMinutes";
+ MemoryNodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ try {
+ System.setProperty(propertyName, "-1");
+ final AtomicBoolean cleaned = new AtomicBoolean();
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+ @Override
+ void cleanUpCheckpoints() {
+ cleaned.set(true);
+ super.cleanUpCheckpoints();
+ }
+ };
+ async.run();
+ assertFalse(cleaned.get());
+ } finally {
+ System.clearProperty(propertyName);
+ }
+ }
+
/**
* OAK-2203 Test reindex behavior on an async index when the index provider is missing
* for a given type