You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/05/02 06:44:00 UTC
svn commit: r1793435 - in /jackrabbit/oak/trunk:
oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/
oak-it/src/test/ja...
Author: chetanm
Date: Tue May 2 06:43:59 2017
New Revision: 1793435
URL: http://svn.apache.org/viewvc?rev=1793435&view=rev
Log:
OAK-5893 - Async index abort should work even during traversals without index updates
-- Introduced a new NodeTraversalCallback which would be called for every
changed node
-- Exposed node read count as part of IndexStatsMBean
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java (with props)
Modified:
jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
Modified: jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Tue May 2 06:43:59 2017
@@ -104,6 +104,15 @@ public interface IndexStatsMBean {
long getUpdates();
/**
+ * Returns the number of which have been read so far. This value is
+ * kept until the next cycle begins.
+ *
+ * @return the number of node read from the current run cycle. This value is
+ * kept until the next cycle begins.
+ */
+ long getNodesReadCount();
+
+ /**
* Returns the current reference checkpoint used by the async indexer
*
* @return the reference checkpoint
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue May 2 06:43:59 2017
@@ -236,8 +236,12 @@ public class AsyncIndexUpdate implements
*
* @see <a href="https://issues.apache.org/jira/browse/OAK-1292">OAK-1292</a>
*/
- protected static class AsyncUpdateCallback implements IndexUpdateCallback {
-
+ protected static class AsyncUpdateCallback implements IndexUpdateCallback, NodeTraversalCallback {
+ /**
+ * Interval in terms of number of nodes traversed after which
+ * time would be checked for lease expiry
+ */
+ public static final int LEASE_CHECK_INTERVAL = 10;
private final NodeStore store;
/** The base checkpoint */
@@ -288,7 +292,7 @@ public class AsyncIndexUpdate implements
NodeState root = store.getRoot();
NodeState async = root.getChildNode(ASYNC);
if(isLeaseCheckEnabled(leaseTimeOut)) {
- long now = System.currentTimeMillis();
+ long now = getTime();
this.lease = now + 2 * leaseTimeOut;
long beforeLease = async.getLong(leaseName);
if (beforeLease > now) {
@@ -324,7 +328,7 @@ public class AsyncIndexUpdate implements
mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, lease, name);
// reset updates counter
- indexStats.resetUpdates();
+ indexStats.reset();
}
private void updateTempCheckpoints(NodeBuilder async,
@@ -367,13 +371,16 @@ public class AsyncIndexUpdate implements
@Override
public void indexUpdate() throws CommitFailedException {
- if (forcedStop.get()){
- forcedStop.set(false);
- throw INTERRUPTED;
- }
+ checkIfStopped();
+ indexStats.incUpdates();
+ }
+
+ @Override
+ public void traversedNode() throws CommitFailedException{
+ checkIfStopped();
- if (indexStats.incUpdates() % 100 == 0 && isLeaseCheckEnabled(leaseTimeOut)) {
- long now = System.currentTimeMillis();
+ if (indexStats.incTraversal() % LEASE_CHECK_INTERVAL == 0 && isLeaseCheckEnabled(leaseTimeOut)) {
+ long now = getTime();
if (now + leaseTimeOut > lease) {
long newLease = now + 2 * leaseTimeOut;
NodeBuilder builder = store.getRoot().builder();
@@ -384,6 +391,10 @@ public class AsyncIndexUpdate implements
}
}
+ protected long getTime() {
+ return System.currentTimeMillis();
+ }
+
public void setCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;
}
@@ -391,6 +402,13 @@ public class AsyncIndexUpdate implements
public void setValidatorProviders(List<ValidatorProvider> validatorProviders) {
this.validatorProviders = checkNotNull(validatorProviders);
}
+
+ private void checkIfStopped() throws CommitFailedException {
+ if (forcedStop.get()){
+ forcedStop.set(false);
+ throw INTERRUPTED;
+ }
+ }
}
@Override
@@ -689,7 +707,7 @@ public class AsyncIndexUpdate implements
markFailingIndexesAsCorrupt(builder);
indexUpdate =
- new IndexUpdate(provider, name, after, builder, callback, CommitInfo.EMPTY, corruptIndexHandler)
+ new IndexUpdate(provider, name, after, builder, callback, callback, CommitInfo.EMPTY, corruptIndexHandler)
.withMissingProviderStrategy(missingStrategy);
CommitFailedException exception =
EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, after);
@@ -888,6 +906,7 @@ public class AsyncIndexUpdate implements
private volatile boolean isPaused;
private volatile long updates;
+ private volatile long nodesRead;
private final Stopwatch watch = Stopwatch.createUnstarted();
private final ExecutionStats execStats;
@@ -1034,13 +1053,17 @@ public class AsyncIndexUpdate implements
return this.isPaused;
}
- void resetUpdates() {
+ void reset() {
this.updates = 0;
+ this.nodesRead = 0;
}
long incUpdates() {
- updates++;
- return updates;
+ return ++updates;
+ }
+
+ long incTraversal() {
+ return ++nodesRead;
}
@Override
@@ -1048,6 +1071,11 @@ public class AsyncIndexUpdate implements
return updates;
}
+ @Override
+ public long getNodesReadCount(){
+ return nodesRead;
+ }
+
void setReferenceCheckpoint(String checkpoint) {
this.referenceCp = checkpoint;
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java Tue May 2 06:43:59 2017
@@ -133,17 +133,18 @@ public class IndexUpdate implements Edit
IndexEditorProvider provider, String async,
NodeState root, NodeBuilder builder,
IndexUpdateCallback updateCallback, CommitInfo commitInfo) {
- this(provider, async, root, builder, updateCallback, commitInfo, CorruptIndexHandler.NOOP);
+ this(provider, async, root, builder, updateCallback, NodeTraversalCallback.NOOP, commitInfo, CorruptIndexHandler.NOOP);
}
public IndexUpdate(
IndexEditorProvider provider, String async,
NodeState root, NodeBuilder builder,
- IndexUpdateCallback updateCallback, CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
+ IndexUpdateCallback updateCallback, NodeTraversalCallback traversalCallback,
+ CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
this.parent = null;
this.name = null;
this.path = "/";
- this.rootState = new IndexUpdateRootState(provider, async, root, updateCallback, commitInfo, corruptIndexHandler);
+ this.rootState = new IndexUpdateRootState(provider, async, root, updateCallback, traversalCallback, commitInfo, corruptIndexHandler);
this.builder = checkNotNull(builder);
}
@@ -157,6 +158,7 @@ public class IndexUpdate implements Edit
@Override
public void enter(NodeState before, NodeState after)
throws CommitFailedException {
+ rootState.nodeRead();
collectIndexEditors(builder.getChildNode(INDEX_DEFINITIONS_NAME), before);
if (!reindex.isEmpty()) {
@@ -369,7 +371,6 @@ public class IndexUpdate implements Edit
@Override @Nonnull
public Editor childNodeAdded(String name, NodeState after)
throws CommitFailedException {
- rootState.nodeRead(name);
List<Editor> children = newArrayListWithCapacity(1 + editors.size());
children.add(new IndexUpdate(this, name));
for (Editor editor : editors) {
@@ -385,7 +386,6 @@ public class IndexUpdate implements Edit
public Editor childNodeChanged(
String name, NodeState before, NodeState after)
throws CommitFailedException {
- rootState.nodeRead(name);
List<Editor> children = newArrayListWithCapacity(1 + editors.size());
children.add(new IndexUpdate(this, name));
for (Editor editor : editors) {
@@ -498,6 +498,7 @@ public class IndexUpdate implements Edit
* Callback for the update events of the indexing job
*/
final IndexUpdateCallback updateCallback;
+ final NodeTraversalCallback traversalCallback;
final Set<String> reindexedIndexes = Sets.newHashSet();
final Map<String, CountingCallback> callbacks = Maps.newHashMap();
final CorruptIndexHandler corruptIndexHandler;
@@ -506,13 +507,15 @@ public class IndexUpdate implements Edit
private MissingIndexProviderStrategy missingProvider = new MissingIndexProviderStrategy();
private IndexUpdateRootState(IndexEditorProvider provider, String async, NodeState root,
- IndexUpdateCallback updateCallback, CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
+ IndexUpdateCallback updateCallback, NodeTraversalCallback traversalCallback,
+ CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
this.provider = checkNotNull(provider);
this.async = async;
this.root = checkNotNull(root);
this.updateCallback = checkNotNull(updateCallback);
this.commitInfo = commitInfo;
this.corruptIndexHandler = corruptIndexHandler;
+ this.traversalCallback = traversalCallback;
}
public IndexUpdateCallback newCallback(String indexPath, boolean reindex) {
@@ -571,8 +574,9 @@ public class IndexUpdate implements Edit
return !reindexedIndexes.isEmpty();
}
- public void nodeRead(String name){
+ public void nodeRead() throws CommitFailedException {
changedNodeCount++;
+ traversalCallback.traversedNode();
}
public void propertyChanged(String name){
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java Tue May 2 06:43:59 2017
@@ -68,7 +68,7 @@ public class IndexUpdateProvider impleme
NodeState before, NodeState after,
NodeBuilder builder, CommitInfo info) {
- IndexUpdate editor = new IndexUpdate(provider, async, after, builder, NOOP_CALLBACK, info, corruptIndexHandler)
+ IndexUpdate editor = new IndexUpdate(provider, async, after, builder, NOOP_CALLBACK, NodeTraversalCallback.NOOP, info, corruptIndexHandler)
.withMissingProviderStrategy(missingStrategy);
editor.setIgnoreReindexFlags(ignoreReindexFlags);
return VisibleEditor.wrap(editor);
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java?rev=1793435&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java Tue May 2 06:43:59 2017
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+/**
+ * Callback which invoked for any changed node read by IndexUpdate
+ * as part of diff traversal
+ */
+interface NodeTraversalCallback{
+ NodeTraversalCallback NOOP = new NodeTraversalCallback() {
+ @Override
+ public void traversedNode() throws CommitFailedException {
+
+ }
+ };
+
+
+ void traversedNode() throws CommitFailedException;
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/NodeTraversalCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Tue May 2 06:43:59 2017
@@ -1773,6 +1773,47 @@ public class AsyncIndexUpdateTest {
}
}
+ @Test
+ public void traversalCount() throws Exception{
+ MemoryNodeStore store = new MemoryNodeStore();
+ PropertyIndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+ async.run();
+
+ //Get rid of changes in index nodes i.e. /oak:index/rootIndex
+ async.run();
+
+ //Do a run without any index property change
+ builder = store.getRoot().builder();
+ builder.child("a").child("b");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ async.run();
+
+ AsyncIndexStats stats = async.getIndexStats();
+ assertEquals(3, stats.getNodesReadCount());
+ assertEquals(0, stats.getUpdates());
+
+ //Do a run with a index property change
+ builder = store.getRoot().builder();
+ builder.child("a").child("b").setProperty("foo", "bar");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ async.run();
+
+ stats = async.getIndexStats();
+ assertEquals(3, stats.getNodesReadCount());
+ assertEquals(1, stats.getUpdates());
+ }
+
private static class TestIndexEditorProvider extends PropertyIndexEditorProvider {
private String indexPathToFail;
@Override
Modified: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1793435&r1=1793434&r2=1793435&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (original)
+++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Tue May 2 06:43:59 2017
@@ -24,9 +24,11 @@ import static org.junit.Assert.assertNot
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.OakBaseTest;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
@@ -37,6 +39,7 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -396,8 +399,52 @@ public class AsyncIndexUpdateLeaseTest e
.hasProperty(AsyncIndexUpdate.leasify(name)));
}
+ @Test
+ public void testLeaseUpdateAndNumberOfChanges() throws Exception {
+ final long lease = 50;
+ testContent(store, AsyncUpdateCallback.LEASE_CHECK_INTERVAL / 2);
+ Set<Long> leaseTimes = Sets.newHashSet();
+ final Clock.Virtual clock = new Clock.Virtual();
+ final IndexStatusListener l1 = new IndexStatusListener() {
+ @Override
+ protected void postPrepare() {
+ collectLeaseTimes();
+ }
+
+ @Override
+ protected void postTraverseNode() {
+ collectLeaseTimes();
+ if (!executed.get()) {
+ clock.waitUntil(clock.getTime() + lease * 3);
+ }
+ executed.set(true);
+ }
+
+ private void collectLeaseTimes() {
+ leaseTimes.add(getLeaseValue());
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1, clock)
+ .setLeaseTimeOut(lease));
+
+ assertEquals(1, leaseTimes.size());
+
+ executed.set(false);
+ leaseTimes.clear();
+
+ //Run with changes more than threshold and then lease should change more than once
+ testContent(store, AsyncUpdateCallback.LEASE_CHECK_INTERVAL * 2);
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1, clock)
+ .setLeaseTimeOut(lease));
+ assertTrue(leaseTimes.size() > 1);
+ }
+
// -------------------------------------------------------------------
+ private long getLeaseValue() {
+ return store.getRoot().getChildNode(":async").getLong( AsyncIndexUpdate.leasify(name));
+ }
+
private static String getReferenceCp(NodeStore store, String name) {
return store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
.getString(name);
@@ -429,14 +476,31 @@ public class AsyncIndexUpdateLeaseTest e
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
+ private static void testContent(NodeStore store, int numOfNodes) throws Exception {
+ NodeBuilder builder = store.getRoot().builder();
+ for (int i = 0; i < numOfNodes; i++) {
+ builder.child("testRoot"+i).setProperty("foo",
+ "abc " + System.currentTimeMillis());
+ }
+
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
private static class SpecialAsyncIndexUpdate extends AsyncIndexUpdate {
private final IndexStatusListener listener;
+ private final Clock clock;
public SpecialAsyncIndexUpdate(String name, NodeStore store,
- IndexEditorProvider provider, IndexStatusListener listener) {
+ IndexEditorProvider provider, IndexStatusListener listener) {
+ this(name, store, provider, listener, Clock.SIMPLE);
+ }
+
+ public SpecialAsyncIndexUpdate(String name, NodeStore store,
+ IndexEditorProvider provider, IndexStatusListener listener, Clock clock) {
super(name, store, provider);
this.listener = listener;
+ this.clock = clock;
}
@Override
@@ -450,19 +514,21 @@ public class AsyncIndexUpdateLeaseTest e
AsyncIndexStats indexStats,
AtomicBoolean stopFlag) {
return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
- checkpoint, indexStats, stopFlag, listener);
+ checkpoint, indexStats, stopFlag, listener, clock);
}
}
private static class SpecialAsyncUpdateCallback extends AsyncUpdateCallback {
private IndexStatusListener listener;
+ private final Clock clock;
public SpecialAsyncUpdateCallback(NodeStore store, String name,
long leaseTimeOut, String checkpoint,
- AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) {
+ AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener, Clock clock) {
super(store, name, leaseTimeOut, checkpoint, indexStats, stopFlag);
this.listener = listener;
+ this.clock = clock;
}
@Override
@@ -480,12 +546,23 @@ public class AsyncIndexUpdateLeaseTest e
}
@Override
+ public void traversedNode() throws CommitFailedException {
+ listener.preTraverseNode();
+ super.traversedNode();
+ listener.postTraverseNode();
+ }
+
+ @Override
void close() throws CommitFailedException {
listener.preClose();
super.close();
listener.postClose();
}
+ @Override
+ protected long getTime() {
+ return clock.getTime();
+ }
}
private abstract static class IndexStatusListener {
@@ -502,6 +579,12 @@ public class AsyncIndexUpdateLeaseTest e
protected void postIndexUpdate() {
}
+ protected void preTraverseNode() {
+ }
+
+ protected void postTraverseNode() {
+ }
+
protected void preClose() {
}