You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2015/12/08 09:44:18 UTC

svn commit: r1718533 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/index/ test/java/org/apache/jackrabbit/oak/plugins/index/

Author: alexparvulescu
Date: Tue Dec  8 08:44:18 2015
New Revision: 1718533

URL: http://svn.apache.org/viewvc?rev=1718533&view=rev
Log:
OAK-3436 Prevent missing checkpoint due to unstable topology from causing complete reindexing
 - cosmetic changes to make the async index code more testable and a few tests, no fixes yet

Added:
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1718533&r1=1718532&r2=1718533&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Dec  8 08:44:18 2015
@@ -95,7 +95,7 @@ public class AsyncIndexUpdate implements
      * timed out. Another node in cluster would wait for timeout before
      * taking over a running job
      */
-    private static final long ASYNC_TIMEOUT;
+    private static final long DEFAULT_ASYNC_TIMEOUT;
 
     static {
         int value = 15;
@@ -105,7 +105,7 @@ public class AsyncIndexUpdate implements
         } catch (NumberFormatException e) {
             // use default
         }
-        ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(value);
+        DEFAULT_ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(value);
     }
 
     private final String name;
@@ -137,15 +137,12 @@ public class AsyncIndexUpdate implements
 
     private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy();
 
-    /**
-     * Property name which stores the temporary checkpoint that need to be released on the next run
-     */
-    private final String tempCpName;
-
     private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
 
     private IndexMBeanRegistration mbeanRegistration;
 
+    private long leaseTimeOut;
+
     /**
      * Controls the length of the interval (in minutes) at which an indexing
      * error is logged as 'warning'. for the rest of the indexing cycles errors
@@ -158,10 +155,10 @@ public class AsyncIndexUpdate implements
             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this.name = checkNotNull(name);
         this.lastIndexedTo = name + "-LastIndexedTo";
-        this.tempCpName = name + "-temp";
         this.store = checkNotNull(store);
         this.provider = checkNotNull(provider);
         this.switchOnSync = switchOnSync;
+        this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT;
     }
 
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@@ -175,22 +172,47 @@ public class AsyncIndexUpdate implements
      *
      * @see <a href="https://issues.apache.org/jira/browse/OAK-1292">OAK-1292</a>
      */
-    private class AsyncUpdateCallback implements IndexUpdateCallback {
+    protected static class AsyncUpdateCallback implements IndexUpdateCallback {
+
+        private final NodeStore store;
 
         /** The base checkpoint */
         private final String checkpoint;
 
+        private final String afterCheckpoint;
+
+        /**
+         * Property name which stores the temporary checkpoint that need to be released on the next run
+         */
+        private final String tempCpName;
+
+        private final long leaseTimeOut;
+
+        private final String name;
+
+        private final String leaseName;
+
+        private final AsyncIndexStats indexStats;
+
         /** Expiration time of the last lease we committed */
         private long lease;
 
-        private final String leaseName;
+        public AsyncUpdateCallback(NodeStore store, String name,
+                long leaseTimeOut, String checkpoint, String afterCheckpoint,
+                AsyncIndexStats indexStats) {
+            this.store = store;
+            this.name = name;
+            this.leaseTimeOut = leaseTimeOut;
+            this.checkpoint = checkpoint;
+            this.afterCheckpoint = afterCheckpoint;
+            this.tempCpName = getTempCpName(name);
+            this.indexStats = indexStats;
+            this.leaseName = leasify(name);
+        }
 
-        public AsyncUpdateCallback(String checkpoint, String afterCheckpoint)
-                throws CommitFailedException {
+        protected void prepare() throws CommitFailedException {
             long now = System.currentTimeMillis();
-            this.checkpoint = checkpoint;
-            this.lease = now + 2 * ASYNC_TIMEOUT;
-            this.leaseName = name + "-lease";
+            this.lease = now + 2 * leaseTimeOut;
 
             NodeState root = store.getRoot();
             long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
@@ -202,7 +224,7 @@ public class AsyncIndexUpdate implements
             NodeBuilder async = builder.child(ASYNC);
             async.setProperty(leaseName, lease);
             updateTempCheckpoints(async, checkpoint, afterCheckpoint);
-            mergeWithConcurrencyCheck(builder, checkpoint, beforeLease);
+            mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name);
 
             // reset updates counter
             indexStats.resetUpdates();
@@ -240,23 +262,22 @@ public class AsyncIndexUpdate implements
             NodeBuilder builder = store.getRoot().builder();
             NodeBuilder async = builder.child(ASYNC);
             async.removeProperty(leaseName);
-            mergeWithConcurrencyCheck(builder, async.getString(name), lease);
+            mergeWithConcurrencyCheck(store, builder, async.getString(name), lease, name);
         }
 
         @Override
         public void indexUpdate() throws CommitFailedException {
             if (indexStats.incUpdates() % 100 == 0) {
                 long now = System.currentTimeMillis();
-                if (now + ASYNC_TIMEOUT > lease) {
-                    long newLease = now + 2 * ASYNC_TIMEOUT;
+                if (now + leaseTimeOut > lease) {
+                    long newLease = now + 2 * leaseTimeOut;
                     NodeBuilder builder = store.getRoot().builder();
                     builder.child(ASYNC).setProperty(leaseName, newLease);
-                    mergeWithConcurrencyCheck(builder, checkpoint, lease);
+                    mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name);
                     lease = newLease;
                 }
             }
         }
-
     }
 
     @Override
@@ -270,12 +291,13 @@ public class AsyncIndexUpdate implements
 
         // check for concurrent updates
         NodeState async = root.getChildNode(ASYNC);
-        long leaseEndTime = async.getLong(name + "-lease");
+        long leaseEndTime = async.getLong(leasify(name));
         long currentTime = System.currentTimeMillis();
         if (leaseEndTime > currentTime) {
-            log.debug(
-                    "[{}] Another copy of the index update is already running; skipping this update. Time left for lease to expire {}s",
-                    name, (leaseEndTime - currentTime) / 1000);
+            long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
+            String err = "Another copy of the index update is already running; skipping this update. Time left for lease to expire "
+                    + leaseExpMsg + "s";
+            indexStats.failed(new Exception(err, CONCURRENT_UPDATE));
             return;
         }
 
@@ -331,7 +353,6 @@ public class AsyncIndexUpdate implements
             log.trace("Switching thread name to {}", newThreadName);
             threadNameChanged = true;
             Thread.currentThread().setName(newThreadName);
-            
             updatePostRunStatus = updateIndex(before, beforeCheckpoint,
                     after, afterCheckpoint, afterTime);
 
@@ -373,8 +394,14 @@ public class AsyncIndexUpdate implements
         }
     }
 
-    private boolean updateIndex(
-            NodeState before, String beforeCheckpoint,
+    protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
+            String name, long leaseTimeOut, String beforeCheckpoint,
+            String afterCheckpoint, AsyncIndexStats indexStats) {
+        return new AsyncUpdateCallback(store, name, leaseTimeOut,
+                beforeCheckpoint, afterCheckpoint, indexStats);
+    }
+
+    private boolean updateIndex(NodeState before, String beforeCheckpoint,
             NodeState after, String afterCheckpoint, String afterTime)
             throws CommitFailedException {
         Stopwatch watch = Stopwatch.createStarted();
@@ -382,8 +409,9 @@ public class AsyncIndexUpdate implements
         boolean progressLogged = false;
         // create an update callback for tracking index updates
         // and maintaining the update lease
-        AsyncUpdateCallback callback =
-                new AsyncUpdateCallback(beforeCheckpoint, afterCheckpoint);
+        AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
+                leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats);
+        callback.prepare();
 
         // check for index tasks split requests, if a split happened, make
         // sure to not delete the reference checkpoint, as the other index
@@ -431,7 +459,8 @@ public class AsyncIndexUpdate implements
                 }
                 updatePostRunStatus = true;
             }
-            mergeWithConcurrencyCheck(builder, beforeCheckpoint, callback.lease);
+            mergeWithConcurrencyCheck(store, builder, beforeCheckpoint,
+                    callback.lease, name);
             if (indexUpdate.isReindexingPerformed()) {
                 log.info("[{}] Reindexing completed for indexes: {} in {}",
                         name, indexUpdate.getReindexStats(), watch);
@@ -454,9 +483,17 @@ public class AsyncIndexUpdate implements
         return updatePostRunStatus;
     }
 
-    private void mergeWithConcurrencyCheck(
-            NodeBuilder builder, final String checkpoint, final long lease)
-            throws CommitFailedException {
+    private static String leasify(String name) {
+        return name + "-lease";
+    }
+
+    private static String getTempCpName(String name) {
+        return name + "-temp";
+    }
+
+    private static void mergeWithConcurrencyCheck(final NodeStore store,
+            NodeBuilder builder, final String checkpoint, final long lease,
+            final String name) throws CommitFailedException {
         CommitHook concurrentUpdateCheck = new CommitHook() {
             @Override @Nonnull
             public NodeState processCommit(
@@ -465,7 +502,7 @@ public class AsyncIndexUpdate implements
                 // check for concurrent updates by this async task
                 NodeState async = before.getChildNode(ASYNC);
                 if (checkpoint == null || Objects.equal(checkpoint, async.getString(name))
-                        && lease == async.getLong(name + "-lease")) {
+                        && lease == async.getLong(leasify(name))) {
                     return after;
                 } else {
                     throw CONCURRENT_UPDATE;
@@ -488,6 +525,14 @@ public class AsyncIndexUpdate implements
         }
     }
 
+    /**
+     * Milliseconds for the timeout
+     */
+    protected AsyncIndexUpdate setLeaseTimeOut(long leaseTimeOut) {
+        this.leaseTimeOut = leaseTimeOut;
+        return this;
+    }
+
     private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
         stats.start(now());
     }
@@ -919,6 +964,7 @@ public class AsyncIndexUpdate implements
         private void split(@CheckForNull String refCheckpoint, long lease) throws CommitFailedException {
             NodeBuilder builder = store.getRoot().builder();
             if (refCheckpoint != null) {
+                String tempCpName = getTempCpName(name);
                 NodeBuilder async = builder.child(ASYNC);
                 // add new reference
                 async.setProperty(newIndexTaskName, refCheckpoint);
@@ -948,7 +994,7 @@ public class AsyncIndexUpdate implements
             }
 
             if (!updated.isEmpty()) {
-                mergeWithConcurrencyCheck(builder, refCheckpoint, lease);
+                mergeWithConcurrencyCheck(store, builder, refCheckpoint, lease, name);
                 log.info(
                         "[{}] Successfully split index definitions {} to async task named {} with referenced checkpoint {}.",
                         name, updated, newIndexTaskName, refCheckpoint);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java?rev=1718533&r1=1718532&r2=1718533&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java Tue Dec  8 08:44:18 2015
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("3.0.0")
+@Version("3.1.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.plugins.index;
 

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1718533&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Tue Dec  8 08:44:18 2015
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.NodeStoreFixture;
+import org.apache.jackrabbit.oak.OakBaseTest;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class AsyncIndexUpdateLeaseTest extends OakBaseTest {
+
+    private final String name = "async";
+    private MemoryNodeStore store;
+    private IndexEditorProvider provider;
+
+    private final AtomicBoolean executed = new AtomicBoolean(false);
+
+    public AsyncIndexUpdateLeaseTest(NodeStoreFixture fixture) {
+        super(fixture);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        store = new MemoryNodeStore();
+        provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, name);
+        builder.child("testRoot").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        executed.set(false);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        assertTrue("Test method was not executed", executed.get());
+        String referenced = getReferenceCp(store, name);
+        assertNotNull("Reference checkpoint doesn't exist", referenced);
+        assertNotNull(
+                "Failed indexer must not clean successful indexer's checkpoint",
+                store.retrieve(referenced));
+    }
+
+    @Test
+    @Ignore("OAK-3436")
+    public void testPrePrepare() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void prePrepare() {
+                executed.set(true);
+                assertRunOk(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPostPrepare() {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postPrepare() {
+                executed.set(true);
+                // lease must prevent this run
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPreIndexUpdate() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        testContent(store);
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void preIndexUpdate() {
+                executed.set(true);
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPostIndexUpdate() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        testContent(store);
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postIndexUpdate() {
+                executed.set(true);
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPreClose() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        testContent(store);
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void preClose() {
+                executed.set(true);
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPostPrepareLeaseExpired() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+        final long lease = 50;
+
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postPrepare() {
+                executed.set(true);
+                try {
+                    TimeUnit.MILLISECONDS.sleep(lease * 3);
+                } catch (InterruptedException e) {
+                    //
+                }
+                assertRunOk(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+                .setLeaseTimeOut(lease));
+    }
+
+    @Test
+    public void testPreIndexUpdateLeaseExpired() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        // add extra indexed content
+        testContent(store);
+
+        final long lease = 50;
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void preIndexUpdate() {
+                executed.set(true);
+                try {
+                    TimeUnit.MILLISECONDS.sleep(lease * 3);
+                } catch (InterruptedException e) {
+                    //
+                }
+                assertRunOk(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+                .setLeaseTimeOut(lease));
+    }
+
+    @Test
+    public void testPostIndexUpdateLeaseExpired() throws Exception {
+        // take care of initial reindex before
+        new AsyncIndexUpdate(name, store, provider).run();
+
+        // add extra indexed content
+        testContent(store);
+
+        final long lease = 50;
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postIndexUpdate() {
+                executed.set(true);
+                try {
+                    TimeUnit.MILLISECONDS.sleep(lease * 3);
+                } catch (InterruptedException e) {
+                    //
+                }
+                assertRunOk(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+                .setLeaseTimeOut(lease));
+    }
+
+    @Test
+    public void testPrePrepareRexindex() throws Exception {
+
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void prePrepare() {
+                executed.set(true);
+                assertRunOk(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPostPrepareReindex() {
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postPrepare() {
+                executed.set(true);
+                // lease must prevent this run
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPreIndexUpdateReindex() throws Exception {
+        testContent(store);
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void preIndexUpdate() {
+                executed.set(true);
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    @Test
+    public void testPostIndexUpdateReindex() throws Exception {
+        testContent(store);
+        final IndexStatusListener l1 = new IndexStatusListener() {
+
+            @Override
+            protected void postIndexUpdate() {
+                executed.set(true);
+                assertRunKo(new AsyncIndexUpdate(name, store, provider));
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+    }
+
+    // -------------------------------------------------------------------
+
+    private static String getReferenceCp(NodeStore store, String name) {
+        return store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+                .getString(name);
+    }
+
+    private void assertRunOk(AsyncIndexUpdate a) {
+        assertRun(a, false);
+    }
+
+    private void assertRunKo(AsyncIndexUpdate a) {
+        assertRun(a, true);
+        assertConcurrentUpdate(a.getIndexStats());
+    }
+
+    private void assertRun(AsyncIndexUpdate a, boolean status) {
+        a.run();
+        assertEquals("Unexpected failiure flag", status, a.isFailing());
+    }
+
+    private void assertConcurrentUpdate(AsyncIndexStats stats) {
+        assertTrue("Error must be of type 'Concurrent update'", stats
+                .getLatestError().contains("Concurrent update detected"));
+    }
+
+    private static void testContent(NodeStore store) throws Exception {
+        NodeBuilder builder = store.getRoot().builder();
+        builder.child("testRoot").setProperty("foo",
+                "abc " + System.currentTimeMillis());
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
+    private static class SpecialAsyncIndexUpdate extends AsyncIndexUpdate {
+
+        private final IndexStatusListener listener;
+
+        public SpecialAsyncIndexUpdate(String name, NodeStore store,
+                IndexEditorProvider provider, IndexStatusListener listener) {
+            super(name, store, provider);
+            this.listener = listener;
+        }
+
+        @Override
+        public synchronized void run() {
+            super.run();
+        }
+
+        @Override
+        protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
+                String name, long leaseTimeOut, String checkpoint,
+                String afterCheckpoint, AsyncIndexStats indexStats) {
+            return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
+                    checkpoint, afterCheckpoint, indexStats, listener);
+        }
+    }
+
+    private static class SpecialAsyncUpdateCallback extends AsyncUpdateCallback {
+
+        private IndexStatusListener listener;
+
+        public SpecialAsyncUpdateCallback(NodeStore store, String name,
+                long leaseTimeOut, String checkpoint, String afterCheckpoint,
+                AsyncIndexStats indexStats, IndexStatusListener listener) {
+            super(store, name, leaseTimeOut, checkpoint, afterCheckpoint,
+                    indexStats);
+            this.listener = listener;
+        }
+
+        @Override
+        protected void prepare() throws CommitFailedException {
+            listener.prePrepare();
+            super.prepare();
+            listener.postPrepare();
+        }
+
+        @Override
+        public void indexUpdate() throws CommitFailedException {
+            listener.preIndexUpdate();
+            super.indexUpdate();
+            listener.postIndexUpdate();
+        }
+
+        @Override
+        void close() throws CommitFailedException {
+            listener.preClose();
+            super.close();
+            listener.postClose();
+        }
+
+    }
+
+    private abstract static class IndexStatusListener {
+
+        protected void prePrepare() {
+        }
+
+        protected void postPrepare() {
+        }
+
+        protected void preIndexUpdate() {
+        }
+
+        protected void postIndexUpdate() {
+        }
+
+        protected void preClose() {
+        }
+
+        protected void postClose() {
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain