You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/05/02 06:44:00 UTC

svn commit: r1793435 - in /jackrabbit/oak/trunk: oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/ oak-it/src/test/ja...

Author: chetanm
Date: Tue May  2 06:43:59 2017
New Revision: 1793435

URL: http://svn.apache.org/viewvc?rev=1793435&view=rev
Log:
OAK-5893 - Async index abort should work even during traversals without index updates

-- Introduced a new NodeTraversalCallback which would be called for every
   changed node
-- Exposed node read count as part of IndexStatsMBean

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
    jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java

Modified: jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Tue May  2 06:43:59 2017
@@ -104,6 +104,15 @@ public interface IndexStatsMBean {
     long getUpdates();
 
     /**
+     * Returns the number of which have been read so far. This value is
+     * kept until the next cycle begins.
+     *
+     * @return the number of node read from the current run cycle. This value is
+     *         kept until the next cycle begins.
+     */
+    long getNodesReadCount();
+
+    /**
      * Returns the current reference checkpoint used by the async indexer
      * 
      * @return the reference checkpoint

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue May  2 06:43:59 2017
@@ -236,8 +236,12 @@ public class AsyncIndexUpdate implements
      *
      * @see <a href="https://issues.apache.org/jira/browse/OAK-1292">OAK-1292</a>
      */
-    protected static class AsyncUpdateCallback implements IndexUpdateCallback {
-
+    protected static class AsyncUpdateCallback implements IndexUpdateCallback, NodeTraversalCallback {
+        /**
+         * Interval in terms of number of nodes traversed after which
+         * time would be checked for lease expiry
+         */
+        public static final int LEASE_CHECK_INTERVAL = 10;
         private final NodeStore store;
 
         /** The base checkpoint */
@@ -288,7 +292,7 @@ public class AsyncIndexUpdate implements
             NodeState root = store.getRoot();
             NodeState async = root.getChildNode(ASYNC);
             if(isLeaseCheckEnabled(leaseTimeOut)) {
-                long now = System.currentTimeMillis();
+                long now = getTime();
                 this.lease = now + 2 * leaseTimeOut;
                 long beforeLease = async.getLong(leaseName);
                 if (beforeLease > now) {
@@ -324,7 +328,7 @@ public class AsyncIndexUpdate implements
             mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, lease, name);
 
             // reset updates counter
-            indexStats.resetUpdates();
+            indexStats.reset();
         }
 
         private void updateTempCheckpoints(NodeBuilder async,
@@ -367,13 +371,16 @@ public class AsyncIndexUpdate implements
 
         @Override
         public void indexUpdate() throws CommitFailedException {
-            if (forcedStop.get()){
-                forcedStop.set(false);
-                throw INTERRUPTED;
-            }
+            checkIfStopped();
+            indexStats.incUpdates();
+        }
+
+        @Override
+        public void traversedNode() throws CommitFailedException{
+            checkIfStopped();
 
-            if (indexStats.incUpdates() % 100 == 0 && isLeaseCheckEnabled(leaseTimeOut)) {
-                long now = System.currentTimeMillis();
+            if (indexStats.incTraversal() % LEASE_CHECK_INTERVAL == 0 && isLeaseCheckEnabled(leaseTimeOut)) {
+                long now = getTime();
                 if (now + leaseTimeOut > lease) {
                     long newLease = now + 2 * leaseTimeOut;
                     NodeBuilder builder = store.getRoot().builder();
@@ -384,6 +391,10 @@ public class AsyncIndexUpdate implements
             }
         }
 
+        protected long getTime() {
+            return System.currentTimeMillis();
+        }
+
         public void setCheckpoint(String checkpoint) {
             this.checkpoint = checkpoint;
         }
@@ -391,6 +402,13 @@ public class AsyncIndexUpdate implements
         public void setValidatorProviders(List<ValidatorProvider> validatorProviders) {
             this.validatorProviders = checkNotNull(validatorProviders);
         }
+
+        private void checkIfStopped() throws CommitFailedException {
+            if (forcedStop.get()){
+                forcedStop.set(false);
+                throw INTERRUPTED;
+            }
+        }
     }
 
     @Override
@@ -689,7 +707,7 @@ public class AsyncIndexUpdate implements
             markFailingIndexesAsCorrupt(builder);
 
             indexUpdate =
-                    new IndexUpdate(provider, name, after, builder, callback, CommitInfo.EMPTY, corruptIndexHandler)
+                    new IndexUpdate(provider, name, after, builder, callback, callback, CommitInfo.EMPTY, corruptIndexHandler)
                     .withMissingProviderStrategy(missingStrategy);
             CommitFailedException exception =
                     EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, after);
@@ -888,6 +906,7 @@ public class AsyncIndexUpdate implements
 
         private volatile boolean isPaused;
         private volatile long updates;
+        private volatile long nodesRead;
         private final Stopwatch watch = Stopwatch.createUnstarted();
         private final ExecutionStats execStats;
 
@@ -1034,13 +1053,17 @@ public class AsyncIndexUpdate implements
             return this.isPaused;
         }
 
-        void resetUpdates() {
+        void reset() {
             this.updates = 0;
+            this.nodesRead = 0;
         }
 
         long incUpdates() {
-            updates++;
-            return updates;
+            return ++updates;
+        }
+
+        long incTraversal() {
+            return ++nodesRead;
         }
 
         @Override
@@ -1048,6 +1071,11 @@ public class AsyncIndexUpdate implements
             return updates;
         }
 
+        @Override
+        public long getNodesReadCount(){
+            return nodesRead;
+        }
+
         void setReferenceCheckpoint(String checkpoint) {
             this.referenceCp = checkpoint;
         }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java Tue May  2 06:43:59 2017
@@ -133,17 +133,18 @@ public class IndexUpdate implements Edit
             IndexEditorProvider provider, String async,
             NodeState root, NodeBuilder builder,
             IndexUpdateCallback updateCallback, CommitInfo commitInfo) {
-        this(provider, async, root, builder, updateCallback, commitInfo, CorruptIndexHandler.NOOP);
+        this(provider, async, root, builder, updateCallback, NodeTraversalCallback.NOOP, commitInfo, CorruptIndexHandler.NOOP);
     }
 
     public IndexUpdate(
             IndexEditorProvider provider, String async,
             NodeState root, NodeBuilder builder,
-            IndexUpdateCallback updateCallback, CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
+            IndexUpdateCallback updateCallback, NodeTraversalCallback traversalCallback,
+            CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
         this.parent = null;
         this.name = null;
         this.path = "/";
-        this.rootState = new IndexUpdateRootState(provider, async, root, updateCallback, commitInfo, corruptIndexHandler);
+        this.rootState = new IndexUpdateRootState(provider, async, root, updateCallback, traversalCallback, commitInfo, corruptIndexHandler);
         this.builder = checkNotNull(builder);
     }
 
@@ -157,6 +158,7 @@ public class IndexUpdate implements Edit
     @Override
     public void enter(NodeState before, NodeState after)
             throws CommitFailedException {
+        rootState.nodeRead();
         collectIndexEditors(builder.getChildNode(INDEX_DEFINITIONS_NAME), before);
 
         if (!reindex.isEmpty()) {
@@ -369,7 +371,6 @@ public class IndexUpdate implements Edit
     @Override @Nonnull
     public Editor childNodeAdded(String name, NodeState after)
             throws CommitFailedException {
-        rootState.nodeRead(name);
         List<Editor> children = newArrayListWithCapacity(1 + editors.size());
         children.add(new IndexUpdate(this, name));
         for (Editor editor : editors) {
@@ -385,7 +386,6 @@ public class IndexUpdate implements Edit
     public Editor childNodeChanged(
             String name, NodeState before, NodeState after)
             throws CommitFailedException {
-        rootState.nodeRead(name);
         List<Editor> children = newArrayListWithCapacity(1 + editors.size());
         children.add(new IndexUpdate(this, name));
         for (Editor editor : editors) {
@@ -498,6 +498,7 @@ public class IndexUpdate implements Edit
          * Callback for the update events of the indexing job
          */
         final IndexUpdateCallback updateCallback;
+        final NodeTraversalCallback traversalCallback;
         final Set<String> reindexedIndexes = Sets.newHashSet();
         final Map<String, CountingCallback> callbacks = Maps.newHashMap();
         final CorruptIndexHandler corruptIndexHandler;
@@ -506,13 +507,15 @@ public class IndexUpdate implements Edit
         private MissingIndexProviderStrategy missingProvider = new MissingIndexProviderStrategy();
 
         private IndexUpdateRootState(IndexEditorProvider provider, String async, NodeState root,
-                                     IndexUpdateCallback updateCallback, CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
+                                     IndexUpdateCallback updateCallback, NodeTraversalCallback traversalCallback,
+                                     CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
             this.provider = checkNotNull(provider);
             this.async = async;
             this.root = checkNotNull(root);
             this.updateCallback = checkNotNull(updateCallback);
             this.commitInfo = commitInfo;
             this.corruptIndexHandler = corruptIndexHandler;
+            this.traversalCallback = traversalCallback;
         }
 
         public IndexUpdateCallback newCallback(String indexPath, boolean reindex) {
@@ -571,8 +574,9 @@ public class IndexUpdate implements Edit
             return !reindexedIndexes.isEmpty();
         }
 
-        public void nodeRead(String name){
+        public void nodeRead() throws CommitFailedException {
             changedNodeCount++;
+            traversalCallback.traversedNode();
         }
 
         public void propertyChanged(String name){

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java Tue May  2 06:43:59 2017
@@ -68,7 +68,7 @@ public class IndexUpdateProvider impleme
             NodeState before, NodeState after,
             NodeBuilder builder, CommitInfo info) {
 
-        IndexUpdate editor = new IndexUpdate(provider, async, after, builder, NOOP_CALLBACK, info, corruptIndexHandler)
+        IndexUpdate editor = new IndexUpdate(provider, async, after, builder, NOOP_CALLBACK, NodeTraversalCallback.NOOP, info, corruptIndexHandler)
                 .withMissingProviderStrategy(missingStrategy);
         editor.setIgnoreReindexFlags(ignoreReindexFlags);
         return VisibleEditor.wrap(editor);

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java?rev=1793435&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java Tue May  2 06:43:59 2017
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+/**
+ * Callback which invoked for any changed node read by IndexUpdate
+ * as part of diff traversal
+ */
+interface NodeTraversalCallback{
+    NodeTraversalCallback NOOP = new NodeTraversalCallback() {
+        @Override
+        public void traversedNode() throws CommitFailedException {
+
+        }
+    };
+
+
+    void traversedNode() throws CommitFailedException;
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Tue May  2 06:43:59 2017
@@ -1773,6 +1773,47 @@ public class AsyncIndexUpdateTest {
         }
     }
 
+    @Test
+    public void traversalCount() throws Exception{
+        MemoryNodeStore store = new MemoryNodeStore();
+        PropertyIndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.run();
+
+        //Get rid of changes in index nodes i.e. /oak:index/rootIndex
+        async.run();
+
+        //Do a run without any index property change
+        builder = store.getRoot().builder();
+        builder.child("a").child("b");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        async.run();
+
+        AsyncIndexStats stats = async.getIndexStats();
+        assertEquals(3, stats.getNodesReadCount());
+        assertEquals(0, stats.getUpdates());
+
+        //Do a run with a index property change
+        builder = store.getRoot().builder();
+        builder.child("a").child("b").setProperty("foo", "bar");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        async.run();
+
+        stats = async.getIndexStats();
+        assertEquals(3, stats.getNodesReadCount());
+        assertEquals(1, stats.getUpdates());
+    }
+
     private static class TestIndexEditorProvider extends PropertyIndexEditorProvider {
         private String indexPathToFail;
         @Override

Modified: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (original)
+++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Tue May  2 06:43:59 2017
@@ -24,9 +24,11 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.Sets;
 import org.apache.jackrabbit.oak.OakBaseTest;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
@@ -37,6 +39,7 @@ import org.apache.jackrabbit.oak.spi.com
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -396,8 +399,52 @@ public class AsyncIndexUpdateLeaseTest e
                         .hasProperty(AsyncIndexUpdate.leasify(name)));
     }
 
+    @Test
+    public void testLeaseUpdateAndNumberOfChanges() throws Exception {
+        final long lease = 50;
+        testContent(store, AsyncUpdateCallback.LEASE_CHECK_INTERVAL / 2);
+        Set<Long> leaseTimes = Sets.newHashSet();
+        final Clock.Virtual clock = new Clock.Virtual();
+        final IndexStatusListener l1 = new IndexStatusListener() {
+            @Override
+            protected void postPrepare() {
+                collectLeaseTimes();
+            }
+
+            @Override
+            protected void postTraverseNode() {
+                collectLeaseTimes();
+                if (!executed.get()) {
+                   clock.waitUntil(clock.getTime() + lease * 3);
+                }
+                executed.set(true);
+            }
+
+            private void collectLeaseTimes() {
+                leaseTimes.add(getLeaseValue());
+            }
+        };
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1, clock)
+                .setLeaseTimeOut(lease));
+
+        assertEquals(1, leaseTimes.size());
+
+        executed.set(false);
+        leaseTimes.clear();
+
+        //Run with changes more than threshold and then lease should change more than once
+        testContent(store, AsyncUpdateCallback.LEASE_CHECK_INTERVAL * 2);
+        assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1, clock)
+                .setLeaseTimeOut(lease));
+        assertTrue(leaseTimes.size() > 1);
+    }
+
     // -------------------------------------------------------------------
 
+    private long getLeaseValue() {
+        return store.getRoot().getChildNode(":async").getLong( AsyncIndexUpdate.leasify(name));
+    }
+
     private static String getReferenceCp(NodeStore store, String name) {
         return store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
                 .getString(name);
@@ -429,14 +476,31 @@ public class AsyncIndexUpdateLeaseTest e
         store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
     }
 
+    private static void testContent(NodeStore store, int numOfNodes) throws Exception {
+        NodeBuilder builder = store.getRoot().builder();
+        for (int i = 0; i < numOfNodes; i++) {
+            builder.child("testRoot"+i).setProperty("foo",
+                    "abc " + System.currentTimeMillis());
+        }
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
     private static class SpecialAsyncIndexUpdate extends AsyncIndexUpdate {
 
         private final IndexStatusListener listener;
+        private final Clock clock;
 
         public SpecialAsyncIndexUpdate(String name, NodeStore store,
-                IndexEditorProvider provider, IndexStatusListener listener) {
+                                       IndexEditorProvider provider, IndexStatusListener listener) {
+            this(name, store, provider, listener, Clock.SIMPLE);
+        }
+
+        public SpecialAsyncIndexUpdate(String name, NodeStore store,
+                                       IndexEditorProvider provider, IndexStatusListener listener, Clock clock) {
             super(name, store, provider);
             this.listener = listener;
+            this.clock = clock;
         }
 
         @Override
@@ -450,19 +514,21 @@ public class AsyncIndexUpdateLeaseTest e
                                                              AsyncIndexStats indexStats,
                                                              AtomicBoolean stopFlag) {
             return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
-                    checkpoint, indexStats, stopFlag, listener);
+                    checkpoint, indexStats, stopFlag, listener, clock);
         }
     }
 
     private static class SpecialAsyncUpdateCallback extends AsyncUpdateCallback {
 
         private IndexStatusListener listener;
+        private final Clock clock;
 
         public SpecialAsyncUpdateCallback(NodeStore store, String name,
                                           long leaseTimeOut, String checkpoint,
-                                          AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) {
+                                          AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener, Clock clock) {
             super(store, name, leaseTimeOut, checkpoint, indexStats, stopFlag);
             this.listener = listener;
+            this.clock = clock;
         }
 
         @Override
@@ -480,12 +546,23 @@ public class AsyncIndexUpdateLeaseTest e
         }
 
         @Override
+        public void traversedNode() throws CommitFailedException {
+            listener.preTraverseNode();
+            super.traversedNode();
+            listener.postTraverseNode();
+        }
+
+        @Override
         void close() throws CommitFailedException {
             listener.preClose();
             super.close();
             listener.postClose();
         }
 
+        @Override
+        protected long getTime() {
+            return clock.getTime();
+        }
     }
 
     private abstract static class IndexStatusListener {
@@ -502,6 +579,12 @@ public class AsyncIndexUpdateLeaseTest e
         protected void postIndexUpdate() {
         }
 
+        protected void preTraverseNode() {
+        }
+
+        protected void postTraverseNode() {
+        }
+
         protected void preClose() {
         }