You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2016/08/16 09:20:32 UTC

svn commit: r1756505 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Author: alexparvulescu
Date: Tue Aug 16 09:20:32 2016
New Revision: 1756505

URL: http://svn.apache.org/viewvc?rev=1756505&view=rev
Log:
OAK-4668 Make async index more resilient on documentmk
 - oak-core fix

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1756505&r1=1756504&r2=1756505&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Aug 16 09:20:32 2016
@@ -201,9 +201,7 @@ public class AsyncIndexUpdate implements
         private final NodeStore store;
 
         /** The base checkpoint */
-        private final String checkpoint;
-
-        private final String afterCheckpoint;
+        private String checkpoint;
 
         /**
          * Property name which stores the temporary checkpoint that need to be released on the next run
@@ -223,21 +221,25 @@ public class AsyncIndexUpdate implements
         /** Expiration time of the last lease we committed */
         private long lease;
 
+        private boolean hasLease = false;
+
         public AsyncUpdateCallback(NodeStore store, String name,
-                long leaseTimeOut, String checkpoint, String afterCheckpoint,
+                long leaseTimeOut, String checkpoint,
                 AsyncIndexStats indexStats, AtomicBoolean forcedStop) {
             this.store = store;
             this.name = name;
             this.forcedStop = forcedStop;
             this.leaseTimeOut = leaseTimeOut;
             this.checkpoint = checkpoint;
-            this.afterCheckpoint = afterCheckpoint;
             this.tempCpName = getTempCpName(name);
             this.indexStats = indexStats;
             this.leaseName = leasify(name);
         }
 
-        protected void prepare() throws CommitFailedException {
+        protected void initLease() throws CommitFailedException {
+            if (hasLease) {
+                return;
+            }
             long now = System.currentTimeMillis();
             this.lease = now + 2 * leaseTimeOut;
 
@@ -251,6 +253,18 @@ public class AsyncIndexUpdate implements
             NodeBuilder async = builder.child(ASYNC);
             async.setProperty(leaseName, lease);
             mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name);
+            hasLease = true;
+        }
+
+        protected void prepare(String afterCheckpoint)
+                throws CommitFailedException {
+            if (!hasLease) {
+                initLease();
+            }
+            NodeState root = store.getRoot();
+            NodeBuilder builder = root.builder();
+            NodeBuilder async = builder.child(ASYNC);
+
             updateTempCheckpoints(async, checkpoint, afterCheckpoint);
             mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name);
 
@@ -311,6 +325,10 @@ public class AsyncIndexUpdate implements
                 }
             }
         }
+
+        public void setCheckpoint(String checkpoint) {
+            this.checkpoint = checkpoint;
+        }
     }
 
     @Override
@@ -390,13 +408,34 @@ public class AsyncIndexUpdate implements
         // find the last indexed state, and check if there are recent changes
         NodeState before;
         String beforeCheckpoint = async.getString(name);
+        AsyncUpdateCallback callback = newAsyncUpdateCallback(store,
+                name, leaseTimeOut, beforeCheckpoint, indexStats,
+                forcedStopFlag);
         if (beforeCheckpoint != null) {
             NodeState state = store.retrieve(beforeCheckpoint);
             if (state == null) {
+                // to make sure we're not reading a stale root rev, we're
+                // attempting a write+read via the lease-grab mechanics
+                try {
+                    callback.initLease();
+                } catch (CommitFailedException e) {
+                    indexStats.failed(e);
+                    return;
+                }
+                root = store.getRoot();
+                beforeCheckpoint = root.getChildNode(ASYNC).getString(name);
+                if (beforeCheckpoint != null) {
+                    state = store.retrieve(beforeCheckpoint);
+                    callback.setCheckpoint(beforeCheckpoint);
+                }
+            }
+
+            if (state == null) {
                 log.warn(
                         "[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
                         name, beforeCheckpoint);
                 beforeCheckpoint = null;
+                callback.setCheckpoint(beforeCheckpoint);
                 before = MISSING_NODE;
             } else if (noVisibleChanges(state, root)) {
                 log.debug(
@@ -436,8 +475,8 @@ public class AsyncIndexUpdate implements
             log.trace("Switching thread name to {}", newThreadName);
             threadNameChanged = true;
             Thread.currentThread().setName(newThreadName);
-            updatePostRunStatus = updateIndex(before, beforeCheckpoint,
-                    after, afterCheckpoint, afterTime);
+            updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
+                    afterCheckpoint, afterTime, callback);
 
             // the update succeeded, i.e. it no longer fails
             if (indexStats.isFailing()) {
@@ -479,23 +518,21 @@ public class AsyncIndexUpdate implements
 
     protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
                                                          String name, long leaseTimeOut, String beforeCheckpoint,
-                                                         String afterCheckpoint, AsyncIndexStats indexStats,
+                                                         AsyncIndexStats indexStats,
                                                          AtomicBoolean stopFlag) {
         return new AsyncUpdateCallback(store, name, leaseTimeOut,
-                beforeCheckpoint, afterCheckpoint, indexStats, stopFlag);
+                beforeCheckpoint, indexStats, stopFlag);
     }
 
-    private boolean updateIndex(NodeState before, String beforeCheckpoint,
-            NodeState after, String afterCheckpoint, String afterTime)
-            throws CommitFailedException {
+    protected boolean updateIndex(NodeState before, String beforeCheckpoint,
+            NodeState after, String afterCheckpoint, String afterTime,
+            AsyncUpdateCallback callback) throws CommitFailedException {
         Stopwatch watch = Stopwatch.createStarted();
         boolean updatePostRunStatus = true;
         boolean progressLogged = false;
-        // create an update callback for tracking index updates
+        // prepare the update callback for tracking index updates
         // and maintaining the update lease
-        AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
-                leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, forcedStopFlag);
-        callback.prepare();
+        callback.prepare(afterCheckpoint);
 
         // check for index tasks split requests, if a split happened, make
         // sure to not delete the reference checkpoint, as the other index
@@ -567,7 +604,7 @@ public class AsyncIndexUpdate implements
         return updatePostRunStatus;
     }
 
-    private static String leasify(String name) {
+    static String leasify(String name) {
         return name + "-lease";
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1756505&r1=1756504&r2=1756505&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Tue Aug 16 09:20:32 2016
@@ -17,6 +17,7 @@
 package org.apache.jackrabbit.oak.plugins.index;
 
 import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.ASYNC;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
@@ -27,6 +28,7 @@ import static org.hamcrest.CoreMatchers.
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -41,12 +43,14 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.management.openmbean.CompositeData;
 
+import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
@@ -62,6 +66,7 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.query.PropertyValues;
 import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -442,7 +447,7 @@ public class AsyncIndexUpdateTest {
 
         builder = store.getRoot().builder();
         // change cp ref to point to a non-existing one
-        builder.child(AsyncIndexUpdate.ASYNC).setProperty("async", "faulty");
+        builder.child(ASYNC).setProperty("async", "faulty");
         builder.child("testAnother").setProperty("foo", "def");
         store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
 
@@ -466,7 +471,7 @@ public class AsyncIndexUpdateTest {
         Set<String> checkpoints = newHashSet(store.listCheckpoints());
         assertTrue("Expecting the initial checkpoint",
                 checkpoints.size() == 1);
-        assertEquals(store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+        assertEquals(store.getRoot().getChildNode(ASYNC)
                 .getString("async"), checkpoints.iterator().next());
 
         async.run();
@@ -507,7 +512,7 @@ public class AsyncIndexUpdateTest {
                 secondCp.equals(firstCp));
         assertEquals(
                 secondCp,
-                store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                store.getRoot().getChildNode(ASYNC)
                         .getString("async"));
     }
 
@@ -545,7 +550,7 @@ public class AsyncIndexUpdateTest {
                 secondCp.equals(firstCp));
         assertEquals(
                 secondCp,
-                store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                store.getRoot().getChildNode(ASYNC)
                         .getString("async"));
     }
 
@@ -631,7 +636,7 @@ public class AsyncIndexUpdateTest {
         assertTrue(
                 "Expecting one temp checkpoint",
                 newHashSet(
-                        store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                        store.getRoot().getChildNode(ASYNC)
                                 .getStrings("async-temp")).size() == 1);
 
         builder = store.getRoot().builder();
@@ -644,7 +649,7 @@ public class AsyncIndexUpdateTest {
         assertTrue(
                 "Expecting two temp checkpoints",
                 newHashSet(
-                        store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                        store.getRoot().getChildNode(ASYNC)
                                 .getStrings("async-temp")).size() == 2);
 
         canRelease.set(true);
@@ -659,11 +664,11 @@ public class AsyncIndexUpdateTest {
         String secondCp = mns.listCheckpoints().iterator().next();
         assertEquals(
                 secondCp,
-                store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                store.getRoot().getChildNode(ASYNC)
                         .getString("async"));
         // the temp cps size is 2 now but the unreferenced checkpoints have been
         // cleared from the store already
-        for (String cp : store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+        for (String cp : store.getRoot().getChildNode(ASYNC)
                 .getStrings("async-temp")) {
             if (cp.equals(secondCp)) {
                 continue;
@@ -702,7 +707,7 @@ public class AsyncIndexUpdateTest {
         String firstCp = store.listCheckpoints().iterator().next();
         assertEquals(
                 firstCp,
-                store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                store.getRoot().getChildNode(ASYNC)
                         .getString("asyncMissing"));
 
         // second run, simulate an index going away
@@ -724,7 +729,7 @@ public class AsyncIndexUpdateTest {
                 secondCp.equals(firstCp));
         assertEquals(
                 firstCp,
-                store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                store.getRoot().getChildNode(ASYNC)
                         .getString("asyncMissing"));
     }
 
@@ -847,7 +852,7 @@ public class AsyncIndexUpdateTest {
         String secondCp = checkpoints.iterator().next();
 
         NodeState asyncNode = store.getRoot().getChildNode(
-                AsyncIndexUpdate.ASYNC);
+                ASYNC);
         assertEquals(firstCp, asyncNode.getString("async-slow"));
         assertEquals(secondCp, asyncNode.getString("async"));
         assertFalse(newHashSet(asyncNode.getStrings("async-temp")).contains(
@@ -914,7 +919,7 @@ public class AsyncIndexUpdateTest {
         String secondCp = checkpoints.iterator().next();
 
         NodeState asyncNode = store.getRoot().getChildNode(
-                AsyncIndexUpdate.ASYNC);
+                ASYNC);
         assertEquals(secondCp, asyncNode.getString("async"));
         assertNull(firstCp, asyncNode.getString("async-slow"));
 
@@ -1067,13 +1072,13 @@ public class AsyncIndexUpdateTest {
         final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
             @Override
             protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
-                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 String beforeCheckpoint,
                                                                  AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
                 try {
                     asyncLock.acquire();
                 } catch (InterruptedException ignore) {
                 }
-                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
                         indexStats, stopFlag);
             }
         };
@@ -1137,13 +1142,13 @@ public class AsyncIndexUpdateTest {
         final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
             @Override
             protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
-                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 String beforeCheckpoint,
                                                                  AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
                 try {
                     asyncLock.acquire();
                 } catch (InterruptedException ignore) {
                 }
-                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
                         indexStats, stopFlag);
             }
         };
@@ -1211,9 +1216,9 @@ public class AsyncIndexUpdateTest {
         final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
             @Override
             protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
-                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 String beforeCheckpoint,
                                                                  AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
-                return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
                         indexStats, stopFlag){
 
                     @Override
@@ -1310,4 +1315,144 @@ public class AsyncIndexUpdateTest {
         fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: ");
     }
 
+    @Test
+    public void greedyLeaseReindex() throws Exception {
+
+        MemoryNodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider);
+        pre.run();
+        pre.close();
+
+        // rm all cps to simulate 'missing cp scenario'
+        for (String cp : store.listCheckpoints()) {
+            store.release(cp);
+        }
+
+        final AtomicBoolean greedyLease = new AtomicBoolean(false);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
+                provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(
+                    NodeStore store, String name, long leaseTimeOut,
+                    String beforeCheckpoint, AsyncIndexStats indexStats,
+                    AtomicBoolean stopFlag) {
+                return new AsyncUpdateCallback(store, name, leaseTimeOut,
+                        beforeCheckpoint, indexStats, stopFlag) {
+
+                    @Override
+                    protected void initLease() throws CommitFailedException {
+                        greedyLease.set(true);
+                        super.initLease();
+                    }
+
+                    @Override
+                    protected void prepare(String afterCheckpoint)
+                            throws CommitFailedException {
+                        assertTrue(greedyLease.get());
+                        super.prepare(afterCheckpoint);
+                    }
+                };
+            }
+        };
+        async.run();
+        async.close();
+        assertTrue(greedyLease.get());
+    }
+
+    @Test
+    public void checkpointLostEventualConsistent() throws Exception {
+
+        MemoryNodeStore store = new MemoryNodeStore();
+        final List<NodeState> rootStates = Lists.newArrayList();
+        store.addObserver(new Observer() {
+            @Override
+            public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+                rootStates.add(root);
+            }
+        });
+
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider);
+        pre.run();
+
+        //Create another commit so that we have two checkpoints
+        builder = store.getRoot().builder();
+        builder.child("testRoot2").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        pre.run();
+
+        pre.close();
+
+        //Look for the nodestate just before the final merge in AsyncIndexUpdate
+        //i.e. where older checkpoint was still referred and which has been "released"
+        //post last run
+        Collections.reverse(rootStates);
+        final AtomicReference<NodeState> oldRootState = new AtomicReference<NodeState>();
+        for (NodeState ns : rootStates) {
+            NodeState async = ns.getChildNode(ASYNC);
+            String checkpointName = async.getString("async");
+            if (store.retrieve(checkpointName) == null &&
+                    async.getProperty(AsyncIndexUpdate.leasify("async")) == null){
+                oldRootState.set(ns);
+                break;
+            }
+        }
+
+        assertNotNull(oldRootState.get());
+
+        final AtomicBoolean intiLeaseCalled = new AtomicBoolean(false);
+        //Here for the call to read existing NodeState we would return the old
+        //"stale" state where we have a stale checkpoint
+        store = new MemoryNodeStore(store.getRoot()) {
+            @Override
+            public NodeState getRoot() {
+                //Keep returning stale view untill initlease is not invoked
+                if (!intiLeaseCalled.get()) {
+                    return oldRootState.get();
+                }
+                return super.getRoot();
+            }
+        };
+
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider){
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(
+                    NodeStore store, String name, long leaseTimeOut,
+                    String beforeCheckpoint, AsyncIndexStats indexStats,
+                    AtomicBoolean stopFlag) {
+                return new AsyncUpdateCallback(store, name, leaseTimeOut,
+                        beforeCheckpoint, indexStats, stopFlag) {
+
+                    @Override
+                    protected void initLease() throws CommitFailedException {
+                        intiLeaseCalled.set(true);
+                        super.initLease();
+                    }
+                };
+            }
+        };
+        async.run();
+
+        //This run should fail
+        assertTrue(async.getIndexStats().isFailing());
+        async.close();
+    }
+
+
+
 }