You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/06/09 06:24:01 UTC

svn commit: r1601309 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Author: jukka
Date: Mon Jun  9 04:24:01 2014
New Revision: 1601309

URL: http://svn.apache.org/r1601309
Log:
OAK-1877: Hourly async reindexing on an idle instance

Increase the lifetime of async indexer checkpoints to 3+ years, but
only create them when there actually are some changes in the repository.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1601309&r1=1601308&r2=1601309&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Jun  9 04:24:01 2014
@@ -19,7 +19,6 @@
 package org.apache.jackrabbit.oak.plugins.index;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.jackrabbit.oak.api.Type.STRING;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
 import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
 import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING;
@@ -35,8 +34,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 
-import com.google.common.base.Objects;
-
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -53,11 +50,14 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.util.ISO8601;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+
 public class AsyncIndexUpdate implements Runnable {
 
     private static final Logger log = LoggerFactory
@@ -69,7 +69,7 @@ public class AsyncIndexUpdate implements
      */
     private static final String ASYNC = ":async";
 
-    private static final long DEFAULT_LIFETIME = TimeUnit.HOURS.toMillis(1);
+    private static final long DEFAULT_LIFETIME = TimeUnit.DAYS.toMillis(1000);
 
     private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
             "Async", 1, "Concurrent update detected");
@@ -142,43 +142,55 @@ public class AsyncIndexUpdate implements
         log.debug("Running background index task {}", name);
 
         if (isAlreadyRunning(store, name)) {
-            log.debug("Async job '{}' found to be already running. Skipping", name);
+            log.debug("The {} indexer is already running; skipping this update", name);
             return;
         }
 
+        NodeState before;
+        NodeState root = store.getRoot();
+        String refCheckpoint = root.getChildNode(ASYNC).getString(name);
+        if (refCheckpoint != null) {
+            NodeState state = store.retrieve(refCheckpoint);
+            if (state == null) {
+                log.warn("Failed to retrieve previously indexed checkpoint {};"
+                        + " rerunning the initial {} index update",
+                        refCheckpoint, name);
+                before = MISSING_NODE;
+            } else if (noVisibleChanges(state, root)) {
+                log.debug("No changes since last checkpoint;"
+                        + " skipping the {} index update", name);
+                return;
+            } else {
+                before = state;
+            }
+        } else {
+            log.info("Initial {} index update", name);
+            before = MISSING_NODE;
+        }
+
         String checkpoint = store.checkpoint(lifetime);
         NodeState after = store.retrieve(checkpoint);
         if (after == null) {
-            log.debug("Unable to retrieve checkpoint {}", checkpoint);
+            log.warn("Unable to retrieve newly created checkpoint {},"
+                    + " skipping the {} index update", checkpoint, name);
             return;
         }
 
-        NodeBuilder builder = after.builder();
+        NodeBuilder builder = store.getRoot().builder();
         NodeBuilder async = builder.child(ASYNC);
-        String refCheckpoint = null;
-
-        NodeState before = null;
-        final PropertyState state = async.getProperty(name);
-        if (state != null && state.getType() == STRING) {
-            refCheckpoint = state.getValue(STRING);
-            before = store.retrieve(refCheckpoint);
-        }
-        if (before == null) {
-            before = MISSING_NODE;
-        }
 
         AsyncUpdateCallback callback = new AsyncUpdateCallback();
         preAsyncRunStatsStats(indexStats);
-        IndexUpdate indexUpdate = new IndexUpdate(provider, name, after,
-                builder, callback);
+        IndexUpdate indexUpdate = new IndexUpdate(
+                provider, name, after, builder, callback);
 
-        CommitFailedException exception = EditorDiff.process(indexUpdate,
-                before, after);
+        CommitFailedException exception = EditorDiff.process(
+                indexUpdate, before, after);
         if (exception == null) {
             if (callback.dirty) {
                 async.setProperty(name, checkpoint);
                 try {
-                    store.merge(builder, newCommitHook(name, state),
+                    store.merge(builder, newCommitHook(name, refCheckpoint),
                             CommitInfo.EMPTY);
                 } catch (CommitFailedException e) {
                     if (e != CONCURRENT_UPDATE) {
@@ -186,8 +198,8 @@ public class AsyncIndexUpdate implements
                     }
                 }
                 if (switchOnSync) {
-                    reindexedDefinitions.addAll(indexUpdate
-                            .getReindexedDefinitions());
+                    reindexedDefinitions.addAll(
+                            indexUpdate.getReindexedDefinitions());
                 }
             } else if (switchOnSync) {
                 log.debug("No changes detected after diff, will try to switch to synchronous updates on "
@@ -206,7 +218,7 @@ public class AsyncIndexUpdate implements
                 }
 
                 try {
-                    store.merge(builder, newCommitHook(name, state),
+                    store.merge(builder, newCommitHook(name, refCheckpoint),
                             CommitInfo.EMPTY);
                     reindexedDefinitions.clear();
                 } catch (CommitFailedException e) {
@@ -219,12 +231,14 @@ public class AsyncIndexUpdate implements
         postAsyncRunStatsStatus(indexStats);
 
         // checkpoints cleanup
-        if (refCheckpoint != null) {
-            store.release(refCheckpoint);
-        }
         if (exception != null || (exception == null && !callback.dirty)) {
-            // error? cleanup current cp too
+            log.debug("The {} index update failed; releasing the related checkpoint {}",
+                    name, checkpoint);
             store.release(checkpoint);
+        } else {
+            log.debug("The {} index update succeeded; releasing the previous checkpoint {}",
+                    name, refCheckpoint);
+            store.release(refCheckpoint);
         }
 
         if (exception != null) {
@@ -240,20 +254,20 @@ public class AsyncIndexUpdate implements
         }
     }
 
-    private static CommitHook newCommitHook(final String name,
-            final PropertyState state) throws CommitFailedException {
+    private static CommitHook newCommitHook(
+            final String name, final String checkpoint) {
         return new CompositeHook(
                 new ConflictHook(new AnnotatingConflictHandler()),
                 new EditorHook(new ConflictValidatorProvider()),
                 new CommitHook() {
-            @Override
-            @Nonnull
-            public NodeState processCommit(NodeState before, NodeState after,
-                    CommitInfo info) throws CommitFailedException {
+            @Override @Nonnull
+            public NodeState processCommit(
+                    NodeState before, NodeState after, CommitInfo info)
+                    throws CommitFailedException {
                 // check for concurrent updates by this async task
-                PropertyState stateAfterRebase = before.getChildNode(ASYNC)
-                        .getProperty(name);
-                if (Objects.equal(state, stateAfterRebase)) {
+                String checkpointAfterRebase =
+                        before.getChildNode(ASYNC).getString(name);
+                if (Objects.equal(checkpoint, checkpointAfterRebase)) {
                     return postAsyncRunNodeStatus(after.builder(), name)
                             .getNodeState();
                 } else {
@@ -309,8 +323,8 @@ public class AsyncIndexUpdate implements
         stats.start(now());
     }
 
-    private static NodeBuilder postAsyncRunNodeStatus(NodeBuilder builder,
-            String name) {
+    private static NodeBuilder postAsyncRunNodeStatus(
+            NodeBuilder builder, String name) {
         String now = now();
         builder.getChildNode(INDEX_DEFINITIONS_NAME)
                 .setProperty(name + "-status", STATUS_DONE)
@@ -375,4 +389,43 @@ public class AsyncIndexUpdate implements
         }
     }
 
+    /**
+     * Checks whether there are no visible changes between the given states.
+     */
+    private static boolean noVisibleChanges(NodeState before, NodeState after) {
+        return after.compareAgainstBaseState(before, new NodeStateDiff() {
+            @Override
+            public boolean propertyAdded(PropertyState after) {
+                return isHidden(after.getName());
+            }
+            @Override
+            public boolean propertyChanged(
+                    PropertyState before, PropertyState after) {
+                return isHidden(after.getName());
+            }
+            @Override
+            public boolean propertyDeleted(PropertyState before) {
+                return isHidden(before.getName());
+            }
+            @Override
+            public boolean childNodeAdded(String name, NodeState after) {
+                return isHidden(name);
+            }
+            @Override
+            public boolean childNodeChanged(
+                    String name, NodeState before, NodeState after) {
+                return isHidden(name)
+                        || after.compareAgainstBaseState(before, this);
+            }
+            @Override
+            public boolean childNodeDeleted(String name, NodeState before) {
+                return isHidden(name);
+            }
+        });
+    }
+
+    private static boolean isHidden(String name) {
+        return name.charAt(0) == ':';
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1601309&r1=1601308&r2=1601309&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon Jun  9 04:24:01 2014
@@ -269,6 +269,7 @@ public class AsyncIndexUpdateTest {
                 async.run();
             }
         });
+
         // drain checkpoint permits
         checkpoint.acquireUninterruptibly(checkpoint.availablePermits());
         // block NodeStore.retrieve()
@@ -276,6 +277,7 @@ public class AsyncIndexUpdateTest {
         t.start();
 
         // wait until async update called checkpoint
+        retrieve.release();
         checkpoint.acquireUninterruptibly();
         builder = store.getRoot().builder();
         builder.child("child").remove();
@@ -334,8 +336,13 @@ public class AsyncIndexUpdateTest {
         locks.put(t, s);
         t.start();
 
+        // make some unrelated changes to trigger indexing
+        builder = store.getRoot().builder();
+        builder.setChildNode("dummy").setProperty("foo", "bar");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
         while (!s.hasQueuedThreads()) {
-            // busy wait
+            Thread.yield();
         }
 
         // introduce a conflict