You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/06/16 07:27:04 UTC

svn commit: r1602798 - in /jackrabbit/oak/branches/1.0: ./ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/

Author: jukka
Date: Mon Jun 16 05:27:04 2014
New Revision: 1602798

URL: http://svn.apache.org/r1602798
Log:
1.0.1: Merged revisions 1602796 and 1602797 (OAK-1877)

Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
    jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
  Merged /jackrabbit/oak/trunk:r1602796-1602797

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1602798&r1=1602797&r2=1602798&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Jun 16 05:27:04 2014
@@ -21,11 +21,9 @@ package org.apache.jackrabbit.oak.plugin
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
 import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
-import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING;
 import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
 
 import java.util.Calendar;
 import java.util.HashSet;
@@ -36,18 +34,15 @@ import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
 import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
-import org.apache.jackrabbit.oak.plugins.value.Conversions;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
 import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
 import org.apache.jackrabbit.oak.spi.commit.EditorHook;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
@@ -75,8 +70,9 @@ public class AsyncIndexUpdate implements
             "Async", 1, "Concurrent update detected");
 
     /**
-     * Timeout in minutes after which an async job would be considered as timed out. Another
-     * node in cluster would wait for timeout before taking over a running job
+     * Timeout in minutes after which an async job would be considered as
+     * timed out. Another node in cluster would wait for timeout before
+     * taking over a running job
      */
     private static final int ASYNC_TIMEOUT = 15;
 
@@ -125,13 +121,55 @@ public class AsyncIndexUpdate implements
      */
     private class AsyncUpdateCallback implements IndexUpdateCallback {
 
-        private boolean dirty = false;
+        /** The base checkpoint */
+        private final String checkpoint;
+
+        /** Expiration time of the last lease we committed */
+        private long lease;
+
+        private long updates = 0;
+
+        public AsyncUpdateCallback(String checkpoint)
+                throws CommitFailedException {
+            long now = System.currentTimeMillis();
+            this.checkpoint = checkpoint;
+            this.lease = now + 2 * ASYNC_TIMEOUT;
+
+            String leaseName = name + "-lease";
+            NodeState root = store.getRoot();
+            long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
+            if (beforeLease > now) {
+                throw CONCURRENT_UPDATE;
+            }
+
+            NodeBuilder builder = root.builder();
+            builder.child(ASYNC).setProperty(leaseName, lease);
+            mergeWithConcurrencyCheck(builder, checkpoint, beforeLease);
+        }
+
+        boolean isDirty() {
+            return updates > 0;
+        }
+
+        void close() throws CommitFailedException {
+            NodeBuilder builder = store.getRoot().builder();
+            NodeBuilder async = builder.child(ASYNC);
+            async.removeProperty(name + "-lease");
+            mergeWithConcurrencyCheck(builder, async.getString(name), lease);
+        }
 
         @Override
         public void indexUpdate() throws CommitFailedException {
-            if (!dirty) {
-                dirty = true;
-                preAsyncRun(store, name);
+            updates++;
+            if (updates % 100 == 0) {
+                long now = System.currentTimeMillis();
+                if (now + ASYNC_TIMEOUT > lease) {
+                    long newLease = now + 2 * ASYNC_TIMEOUT;
+                    NodeBuilder builder = store.getRoot().builder();
+                    builder.child(ASYNC).setProperty(name + "-lease", newLease);
+                    mergeWithConcurrencyCheck(builder, checkpoint, lease);
+                    lease = newLease;
+                }
             }
         }
 
@@ -141,26 +179,33 @@ public class AsyncIndexUpdate implements
     public synchronized void run() {
         log.debug("Running background index task {}", name);
 
-        if (isAlreadyRunning(store, name)) {
-            log.debug("The {} indexer is already running; skipping this update", name);
+        NodeState root = store.getRoot();
+
+        // check for concurrent updates
+        NodeState async = root.getChildNode(ASYNC);
+        if (async.getLong(name + "-lease") > System.currentTimeMillis()) {
+            log.debug("Another copy of the {} index update is already running;"
+                    + " skipping this update", name);
             return;
         }
 
+        // find the last indexed state, and check if there are recent changes
         NodeState before;
-        NodeState root = store.getRoot();
-        String refCheckpoint = root.getChildNode(ASYNC).getString(name);
-        if (refCheckpoint != null) {
-            NodeState state = store.retrieve(refCheckpoint);
+        String beforeCheckpoint = async.getString(name);
+        if (beforeCheckpoint != null) {
+            NodeState state = store.retrieve(beforeCheckpoint);
             if (state == null) {
                 log.warn("Failed to retrieve previously indexed checkpoint {};"
-                        + " rerunning the initial {} index update",
-                        refCheckpoint, name);
+                        + " re-running the initial {} index update",
+                        beforeCheckpoint, name);
+                beforeCheckpoint = null;
                 before = MISSING_NODE;
             } else if (noVisibleChanges(state, root)) {
                 log.debug("No changes since last checkpoint;"
                         + " skipping the {} index update", name);
                 return;
             } else {
+        System.err.println("Some visible changes detected");
                 before = state;
             }
         } else {
@@ -168,35 +213,74 @@ public class AsyncIndexUpdate implements
             before = MISSING_NODE;
         }
 
-        String checkpoint = store.checkpoint(lifetime);
-        NodeState after = store.retrieve(checkpoint);
+        // there are some recent changes, so let's create a new checkpoint
+        String afterCheckpoint = store.checkpoint(lifetime);
+        NodeState after = store.retrieve(afterCheckpoint);
         if (after == null) {
             log.warn("Unable to retrieve newly created checkpoint {},"
-                    + " skipping the {} index update", checkpoint, name);
+                    + " skipping the {} index update", afterCheckpoint, name);
             return;
         }
 
-        NodeBuilder builder = store.getRoot().builder();
-        NodeBuilder async = builder.child(ASYNC);
+        String checkpointToRelease = afterCheckpoint;
+        try {
+            updateIndex(before, beforeCheckpoint, after, afterCheckpoint);
 
-        AsyncUpdateCallback callback = new AsyncUpdateCallback();
+            // the update succeeded, i.e. it no longer fails
+            if (failing) {
+                log.info("Index update {} no longer fails", name);
+                failing = false;
+            }
+
+            // the update succeeded, so we can release the earlier checkpoint
+            // otherwise the new checkpoint associated with the failed update
+            // will get released in the finally block
+            checkpointToRelease = beforeCheckpoint;
+
+        } catch (CommitFailedException e) {
+            if (e == CONCURRENT_UPDATE) {
+                log.debug("Concurrent update detected in the {} index update", name);
+            } else if (failing) {
+                log.debug("The {} index update is still failing", name, e);
+            } else {
+                log.warn("The {} index update failed", name, e);
+                failing = true;
+            }
+
+        } finally {
+            if (checkpointToRelease != null) { // null during initial indexing
+                store.release(checkpointToRelease);
+            }
+        }
+    }
+
+    private void updateIndex(
+            NodeState before, String beforeCheckpoint,
+            NodeState after, String afterCheckpoint)
+            throws CommitFailedException {
+        // start collecting runtime statistics
         preAsyncRunStatsStats(indexStats);
-        IndexUpdate indexUpdate = new IndexUpdate(
-                provider, name, after, builder, callback);
 
-        CommitFailedException exception = EditorDiff.process(
-                indexUpdate, before, after);
-        if (exception == null) {
-            if (callback.dirty) {
-                async.setProperty(name, checkpoint);
-                try {
-                    store.merge(builder, newCommitHook(name, refCheckpoint),
-                            CommitInfo.EMPTY);
-                } catch (CommitFailedException e) {
-                    if (e != CONCURRENT_UPDATE) {
-                        exception = e;
-                    }
-                }
+        // create an update callback for tracking index updates
+        // and maintaining the update lease
+        AsyncUpdateCallback callback =
+                new AsyncUpdateCallback(beforeCheckpoint);
+        try {
+            NodeBuilder builder = store.getRoot().builder();
+
+            IndexUpdate indexUpdate =
+                    new IndexUpdate(provider, name, after, builder, callback);
+            CommitFailedException exception =
+                    EditorDiff.process(indexUpdate, before, after);
+            if (exception != null) {
+                throw exception;
+            }
+
+            if (callback.isDirty() || before == MISSING_NODE) {
+                builder.child(ASYNC).setProperty(name, afterCheckpoint);
+                mergeWithConcurrencyCheck(
+                        builder, beforeCheckpoint, callback.lease);
+
                 if (switchOnSync) {
                     reindexedDefinitions.addAll(
                             indexUpdate.getReindexedDefinitions());
@@ -204,9 +288,9 @@ public class AsyncIndexUpdate implements
                     postAsyncRunStatsStatus(indexStats);
                 }
             } else if (switchOnSync) {
-                log.debug("No changes detected after diff, will try to switch to synchronous updates on "
-                        + reindexedDefinitions);
-                async.setProperty(name, checkpoint);
+                log.debug("No changes detected after diff; will try to"
+                        + " switch to synchronous updates on {}",
+                        reindexedDefinitions);
 
                 // no changes after diff, switch to sync on the async defs
                 for (String path : reindexedDefinitions) {
@@ -219,126 +303,47 @@ public class AsyncIndexUpdate implements
                     }
                 }
 
-                try {
-                    store.merge(builder, newCommitHook(name, refCheckpoint),
-                            CommitInfo.EMPTY);
-                    reindexedDefinitions.clear();
-                    postAsyncRunStatsStatus(indexStats);
-                } catch (CommitFailedException e) {
-                    if (e != CONCURRENT_UPDATE) {
-                        exception = e;
-                    }
-                }
-            }
-        }
-
-        // checkpoints cleanup
-        if (exception != null || (exception == null && !callback.dirty)) {
-            log.debug("The {} index update failed; releasing the related checkpoint {}",
-                    name, checkpoint);
-            store.release(checkpoint);
-        } else {
-            if (refCheckpoint != null) {
-                log.debug(
-                        "The {} index update succeeded; releasing the previous checkpoint {}",
-                        name, refCheckpoint);
-                store.release(refCheckpoint);
+                mergeWithConcurrencyCheck(
+                        builder, beforeCheckpoint, callback.lease);
+                reindexedDefinitions.clear();
             }
+        } finally {
+            callback.close();
         }
 
-        if (exception != null) {
-            if (!failing) {
-                log.warn("Index update {} failed", name, exception);
-            }
-            failing = true;
-        } else {
-            if (failing) {
-                log.info("Index update {} no longer fails", name);
-            }
-            failing = false;
-        }
+        postAsyncRunStatsStatus(indexStats);
     }
 
-    private static CommitHook newCommitHook(
-            final String name, final String checkpoint) {
-        return new CompositeHook(
-                new ConflictHook(new AnnotatingConflictHandler()),
-                new EditorHook(new ConflictValidatorProvider()),
-                new CommitHook() {
+    private void mergeWithConcurrencyCheck(
+            NodeBuilder builder, final String checkpoint, final long lease)
+            throws CommitFailedException {
+        CommitHook concurrentUpdateCheck = new CommitHook() {
             @Override @Nonnull
             public NodeState processCommit(
                     NodeState before, NodeState after, CommitInfo info)
                     throws CommitFailedException {
                 // check for concurrent updates by this async task
-                String checkpointAfterRebase =
-                        before.getChildNode(ASYNC).getString(name);
-                if (Objects.equal(checkpoint, checkpointAfterRebase)) {
-                    return postAsyncRunNodeStatus(after.builder(), name)
-                            .getNodeState();
+                NodeState async = before.getChildNode(ASYNC);
+                if (Objects.equal(checkpoint, async.getString(name))
+                        && lease == async.getLong(name + "-lease")) {
+                    return after;
                 } else {
                     throw CONCURRENT_UPDATE;
                 }
             }
-        });
-    }
-
-    private static void preAsyncRun(NodeStore store, String name) throws CommitFailedException {
-        NodeBuilder builder = store.getRoot().builder();
-        preAsyncRunNodeStatus(builder, name);
-        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-    }
-
-    private static boolean isAlreadyRunning(NodeStore store, String name) {
-        NodeState indexState = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME);
-
-        //Probably the first run
-        if (!indexState.exists()) {
-            return false;
-        }
-
-        //Check if already running or timed out
-        if (STATUS_RUNNING.equals(indexState.getString(name + "-status"))) {
-            PropertyState startTime = indexState.getProperty(name + "-start");
-            Calendar start = Conversions.convert(startTime.getValue(Type.DATE)).toCalendar();
-            Calendar now = Calendar.getInstance();
-            long delta = now.getTimeInMillis() - start.getTimeInMillis();
-
-            //Check if the job has timed out and we need to take over
-            if (TimeUnit.MILLISECONDS.toMinutes(delta) > ASYNC_TIMEOUT) {
-                log.info("Async job found which stated on {} has timed out in {} minutes. " +
-                        "This node would take over the job.",
-                        startTime.getValue(Type.DATE), ASYNC_TIMEOUT);
-                return false;
-            }
-            return true;
-        }
-
-        return false;
-    }
-
-    private static void preAsyncRunNodeStatus(NodeBuilder builder, String name) {
-        String now = now();
-        builder.getChildNode(INDEX_DEFINITIONS_NAME)
-                .setProperty(name + "-status", STATUS_RUNNING)
-                .setProperty(name + "-start", now, Type.DATE)
-                .removeProperty(name + "-done");
+        };
+        CompositeHook hooks = new CompositeHook(
+                new ConflictHook(new AnnotatingConflictHandler()),
+                new EditorHook(new ConflictValidatorProvider()),
+                concurrentUpdateCheck);
+        store.merge(builder, hooks, CommitInfo.EMPTY);
     }
 
     private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
         stats.start(now());
     }
 
-    private static NodeBuilder postAsyncRunNodeStatus(
-            NodeBuilder builder, String name) {
-        String now = now();
-        builder.getChildNode(INDEX_DEFINITIONS_NAME)
-                .setProperty(name + "-status", STATUS_DONE)
-                .setProperty(name + "-done", now, Type.DATE)
-                .removeProperty(name + "-start");
-        return builder;
-    }
-
-    private static void postAsyncRunStatsStatus(AsyncIndexStats stats) {
+     private static void postAsyncRunStatsStatus(AsyncIndexStats stats) {
         stats.done(now());
     }
 
@@ -398,6 +403,7 @@ public class AsyncIndexUpdate implements
      * Checks whether there are no visible changes between the given states.
      */
     private static boolean noVisibleChanges(NodeState before, NodeState after) {
+        System.err.println("Checking for visible changes...");
         return after.compareAgainstBaseState(before, new NodeStateDiff() {
             @Override
             public boolean propertyAdded(PropertyState after) {
@@ -430,6 +436,7 @@ public class AsyncIndexUpdate implements
     }
 
     private static boolean isHidden(String name) {
+        System.err.println("  checking " + name);
         return name.charAt(0) == ':';
     }
 

Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1602798&r1=1602797&r2=1602798&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java Mon Jun 16 05:27:04 2014
@@ -102,7 +102,7 @@ public class IndexUpdate implements Edit
         this.provider = parent.provider;
         this.async = parent.async;
         this.root = parent.root;
-        this.builder = parent.builder.child(checkNotNull(name));
+        this.builder = parent.builder.getChildNode(checkNotNull(name));
         this.updateCallback = parent.updateCallback;
     }
 

Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1602798&r1=1602797&r2=1602798&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon Jun 16 05:27:04 2014
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index;
 
+import static com.google.common.collect.Sets.newHashSet;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
@@ -50,7 +51,6 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.query.PropertyValues;
-import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
 import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -371,15 +371,20 @@ public class AsyncIndexUpdateTest {
     public void cpCleanupNoChanges() throws Exception {
         MemoryNodeStore store = new MemoryNodeStore();
         IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
 
         assertTrue("Expecting no checkpoints",
                 store.listCheckpoints().size() == 0);
 
         // no changes on diff, no checkpoints left behind
-        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
         async.run();
-        assertTrue("Expecting no checkpoints",
-                store.listCheckpoints().size() == 0);
+        Set<String> checkpoints = newHashSet(store.listCheckpoints());
+        assertTrue("Expecting the initial checkpoint",
+                checkpoints.size() == 1);
+
+        async.run();
+        assertEquals("Expecting no checkpoint changes",
+                checkpoints, store.listCheckpoints());
     }
 
     @Test