You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/09/28 07:42:54 UTC

svn commit: r1762612 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Author: mreutegg
Date: Wed Sep 28 07:42:53 2016
New Revision: 1762612

URL: http://svn.apache.org/viewvc?rev=1762612&view=rev
Log:
OAK-4826: Auto removal of orphaned checkpoints

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1762612&r1=1762611&r2=1762612&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Wed Sep 28 07:42:53 2016
@@ -30,6 +30,7 @@ import static org.apache.jackrabbit.oak.
 import java.io.Closeable;
 import java.util.Calendar;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -175,6 +176,28 @@ public class AsyncIndexUpdate implements
 
     private boolean closed;
 
+    /**
+     * The checkpoint cleanup interval in minutes. Defaults to 5 minutes.
+     * Setting it to a negative value disables automatic cleanup. See OAK-4826.
+     */
+    private final int cleanupIntervalMinutes = getCleanupIntervalMinutes();
+
+    private static int getCleanupIntervalMinutes() {
+        int value = 5;
+        try {
+            value = Integer.parseInt(System.getProperty(
+                    "oak.async.checkpointCleanupIntervalMinutes", String.valueOf(value)));
+        } catch (NumberFormatException e) {
+            // use default
+        }
+        return value;
+    }
+
+    /**
+     * The time in minutes since the epoch when the last checkpoint cleanup ran.
+     */
+    private long lastCheckpointCleanUpTime;
+
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this.name = checkNotNull(name);
@@ -457,6 +480,7 @@ public class AsyncIndexUpdate implements
         boolean threadNameChanged = false;
         String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
                 "creator", AsyncIndexUpdate.class.getSimpleName(),
+                "created", afterTime,
                 "thread", oldThreadName,
                 "name", name));
         NodeState after = store.retrieve(afterCheckpoint);
@@ -509,12 +533,65 @@ public class AsyncIndexUpdate implements
                             checkpointToRelease);
                 }
             }
+            maybeCleanUpCheckpoints();
 
             if (updatePostRunStatus) {
                 postAsyncRunStatsStatus(indexStats);
             }
         }
     }
+
+    private void maybeCleanUpCheckpoints() {
+        // clean up every five minutes
+        long currentMinutes = TimeUnit.MILLISECONDS.toMinutes(
+                System.currentTimeMillis());
+        if (!indexStats.isFailing()
+                && cleanupIntervalMinutes > -1
+                && lastCheckpointCleanUpTime + cleanupIntervalMinutes < currentMinutes) {
+            try {
+                cleanUpCheckpoints();
+            } catch (Throwable e) {
+                log.warn("Checkpoint clean up failed", e);
+            }
+            lastCheckpointCleanUpTime = currentMinutes;
+        }
+    }
+
+    void cleanUpCheckpoints() {
+        log.debug("Cleaning up orphaned checkpoints");
+        Set<String> keep = newHashSet();
+        String cp = indexStats.getReferenceCheckpoint();
+        if (cp == null) {
+            log.warn("No reference checkpoint set in index stats");
+            return;
+        }
+        keep.add(cp);
+        keep.addAll(indexStats.tempCps);
+        Map<String, String> info = store.checkpointInfo(cp);
+        String value = info.get("created");
+        if (value != null) {
+            // remove unreferenced AsyncIndexUpdate checkpoints:
+            // - without 'created' info (checkpoint created before OAK-4826)
+            // or
+            // - 'created' value older than the current reference
+            long current = ISO8601.parse(value).getTimeInMillis();
+            for (String checkpoint : store.checkpoints()) {
+                info = store.checkpointInfo(checkpoint);
+                String creator = info.get("creator");
+                String created = info.get("created");
+                String name = info.get("name");
+                if (!keep.contains(checkpoint)
+                        && this.name.equals(name)
+                        && AsyncIndexUpdate.class.getSimpleName().equals(creator)
+                        && (created == null || ISO8601.parse(created).getTimeInMillis() < current)) {
+                    if (store.release(checkpoint)) {
+                        log.info("Removed orphaned checkpoint '{}' {}",
+                                checkpoint, info);
+                    }
+                }
+            }
+        }
+    }
 
     protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
                                                          String name, long leaseTimeOut, String beforeCheckpoint,

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1762612&r1=1762611&r2=1762612&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Wed Sep 28 07:42:53 2016
@@ -73,6 +73,7 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Test;
 
 import ch.qos.logback.classic.Level;
@@ -678,6 +679,77 @@ public class AsyncIndexUpdateTest {
         }
     }
 
+    // OAK-4826
+    @Test
+    public void cpCleanupOrphaned() throws Exception {
+        Clock clock = Clock.SIMPLE;
+        MemoryNodeStore store = new MemoryNodeStore();
+        // prepare index and initial content
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        assertTrue("Expecting no checkpoints",
+                store.listCheckpoints().size() == 0);
+
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.run();
+        assertTrue("Expecting one checkpoint",
+                store.listCheckpoints().size() == 1);
+        String cp = store.listCheckpoints().iterator().next();
+        Map<String, String> info = store.checkpointInfo(cp);
+
+        builder = store.getRoot().builder();
+        builder.child("testRoot").setProperty("foo", "def");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        // wait until currentTimeMillis() changes. this ensures
+        // the created value for the checkpoint is different
+        // from the previous checkpoint.
+        clock.waitUntil(clock.getTime() + 1);
+        async.run();
+        assertTrue("Expecting one checkpoint",
+                store.listCheckpoints().size() == 1);
+        cp = store.listCheckpoints().iterator().next();
+
+        // create a new checkpoint with the info from the first checkpoint
+        // this simulates an orphaned checkpoint that should be cleaned up
+        assertNotNull(store.checkpoint(TimeUnit.HOURS.toMillis(1), info));
+        assertTrue("Expecting two checkpoints",
+                store.listCheckpoints().size() == 2);
+
+        async.cleanUpCheckpoints();
+        assertTrue("Expecting one checkpoint",
+                store.listCheckpoints().size() == 1);
+        assertEquals(cp, store.listCheckpoints().iterator().next());
+    }
+
+    @Test
+    public void disableCheckpointCleanup() throws Exception {
+        String propertyName = "oak.async.checkpointCleanupIntervalMinutes";
+        MemoryNodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        try {
+            System.setProperty(propertyName, "-1");
+            final AtomicBoolean cleaned = new AtomicBoolean();
+            AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+                @Override
+                void cleanUpCheckpoints() {
+                    cleaned.set(true);
+                    super.cleanUpCheckpoints();
+                }
+            };
+            async.run();
+            assertFalse(cleaned.get());
+        } finally {
+            System.clearProperty(propertyName);
+        }
+    }
+
     /**
      * OAK-2203 Test reindex behavior on an async index when the index provider is missing
      * for a given type