You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2015/12/08 09:44:18 UTC
svn commit: r1718533 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/index/
test/java/org/apache/jackrabbit/oak/plugins/index/
Author: alexparvulescu
Date: Tue Dec 8 08:44:18 2015
New Revision: 1718533
URL: http://svn.apache.org/viewvc?rev=1718533&view=rev
Log:
OAK-3436 Prevent missing checkpoint due to unstable topology from causing complete reindexing
- cosmetic changes to make the async index code more testable and a few tests, no fixes yet
Added:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (with props)
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1718533&r1=1718532&r2=1718533&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Dec 8 08:44:18 2015
@@ -95,7 +95,7 @@ public class AsyncIndexUpdate implements
* timed out. Another node in cluster would wait for timeout before
* taking over a running job
*/
- private static final long ASYNC_TIMEOUT;
+ private static final long DEFAULT_ASYNC_TIMEOUT;
static {
int value = 15;
@@ -105,7 +105,7 @@ public class AsyncIndexUpdate implements
} catch (NumberFormatException e) {
// use default
}
- ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(value);
+ DEFAULT_ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(value);
}
private final String name;
@@ -137,15 +137,12 @@ public class AsyncIndexUpdate implements
private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy();
- /**
- * Property name which stores the temporary checkpoint that need to be released on the next run
- */
- private final String tempCpName;
-
private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
private IndexMBeanRegistration mbeanRegistration;
+ private long leaseTimeOut;
+
/**
* Controls the length of the interval (in minutes) at which an indexing
* error is logged as 'warning'. for the rest of the indexing cycles errors
@@ -158,10 +155,10 @@ public class AsyncIndexUpdate implements
@Nonnull IndexEditorProvider provider, boolean switchOnSync) {
this.name = checkNotNull(name);
this.lastIndexedTo = name + "-LastIndexedTo";
- this.tempCpName = name + "-temp";
this.store = checkNotNull(store);
this.provider = checkNotNull(provider);
this.switchOnSync = switchOnSync;
+ this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT;
}
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@@ -175,22 +172,47 @@ public class AsyncIndexUpdate implements
*
* @see <a href="https://issues.apache.org/jira/browse/OAK-1292">OAK-1292</a>
*/
- private class AsyncUpdateCallback implements IndexUpdateCallback {
+ protected static class AsyncUpdateCallback implements IndexUpdateCallback {
+
+ private final NodeStore store;
/** The base checkpoint */
private final String checkpoint;
+ private final String afterCheckpoint;
+
+ /**
+ * Property name which stores the temporary checkpoint that need to be released on the next run
+ */
+ private final String tempCpName;
+
+ private final long leaseTimeOut;
+
+ private final String name;
+
+ private final String leaseName;
+
+ private final AsyncIndexStats indexStats;
+
/** Expiration time of the last lease we committed */
private long lease;
- private final String leaseName;
+ public AsyncUpdateCallback(NodeStore store, String name,
+ long leaseTimeOut, String checkpoint, String afterCheckpoint,
+ AsyncIndexStats indexStats) {
+ this.store = store;
+ this.name = name;
+ this.leaseTimeOut = leaseTimeOut;
+ this.checkpoint = checkpoint;
+ this.afterCheckpoint = afterCheckpoint;
+ this.tempCpName = getTempCpName(name);
+ this.indexStats = indexStats;
+ this.leaseName = leasify(name);
+ }
- public AsyncUpdateCallback(String checkpoint, String afterCheckpoint)
- throws CommitFailedException {
+ protected void prepare() throws CommitFailedException {
long now = System.currentTimeMillis();
- this.checkpoint = checkpoint;
- this.lease = now + 2 * ASYNC_TIMEOUT;
- this.leaseName = name + "-lease";
+ this.lease = now + 2 * leaseTimeOut;
NodeState root = store.getRoot();
long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
@@ -202,7 +224,7 @@ public class AsyncIndexUpdate implements
NodeBuilder async = builder.child(ASYNC);
async.setProperty(leaseName, lease);
updateTempCheckpoints(async, checkpoint, afterCheckpoint);
- mergeWithConcurrencyCheck(builder, checkpoint, beforeLease);
+ mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name);
// reset updates counter
indexStats.resetUpdates();
@@ -240,23 +262,22 @@ public class AsyncIndexUpdate implements
NodeBuilder builder = store.getRoot().builder();
NodeBuilder async = builder.child(ASYNC);
async.removeProperty(leaseName);
- mergeWithConcurrencyCheck(builder, async.getString(name), lease);
+ mergeWithConcurrencyCheck(store, builder, async.getString(name), lease, name);
}
@Override
public void indexUpdate() throws CommitFailedException {
if (indexStats.incUpdates() % 100 == 0) {
long now = System.currentTimeMillis();
- if (now + ASYNC_TIMEOUT > lease) {
- long newLease = now + 2 * ASYNC_TIMEOUT;
+ if (now + leaseTimeOut > lease) {
+ long newLease = now + 2 * leaseTimeOut;
NodeBuilder builder = store.getRoot().builder();
builder.child(ASYNC).setProperty(leaseName, newLease);
- mergeWithConcurrencyCheck(builder, checkpoint, lease);
+ mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name);
lease = newLease;
}
}
}
-
}
@Override
@@ -270,12 +291,13 @@ public class AsyncIndexUpdate implements
// check for concurrent updates
NodeState async = root.getChildNode(ASYNC);
- long leaseEndTime = async.getLong(name + "-lease");
+ long leaseEndTime = async.getLong(leasify(name));
long currentTime = System.currentTimeMillis();
if (leaseEndTime > currentTime) {
- log.debug(
- "[{}] Another copy of the index update is already running; skipping this update. Time left for lease to expire {}s",
- name, (leaseEndTime - currentTime) / 1000);
+ long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
+ String err = "Another copy of the index update is already running; skipping this update. Time left for lease to expire "
+ + leaseExpMsg + "s";
+ indexStats.failed(new Exception(err, CONCURRENT_UPDATE));
return;
}
@@ -331,7 +353,6 @@ public class AsyncIndexUpdate implements
log.trace("Switching thread name to {}", newThreadName);
threadNameChanged = true;
Thread.currentThread().setName(newThreadName);
-
updatePostRunStatus = updateIndex(before, beforeCheckpoint,
after, afterCheckpoint, afterTime);
@@ -373,8 +394,14 @@ public class AsyncIndexUpdate implements
}
}
- private boolean updateIndex(
- NodeState before, String beforeCheckpoint,
+ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
+ String name, long leaseTimeOut, String beforeCheckpoint,
+ String afterCheckpoint, AsyncIndexStats indexStats) {
+ return new AsyncUpdateCallback(store, name, leaseTimeOut,
+ beforeCheckpoint, afterCheckpoint, indexStats);
+ }
+
+ private boolean updateIndex(NodeState before, String beforeCheckpoint,
NodeState after, String afterCheckpoint, String afterTime)
throws CommitFailedException {
Stopwatch watch = Stopwatch.createStarted();
@@ -382,8 +409,9 @@ public class AsyncIndexUpdate implements
boolean progressLogged = false;
// create an update callback for tracking index updates
// and maintaining the update lease
- AsyncUpdateCallback callback =
- new AsyncUpdateCallback(beforeCheckpoint, afterCheckpoint);
+ AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
+ leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats);
+ callback.prepare();
// check for index tasks split requests, if a split happened, make
// sure to not delete the reference checkpoint, as the other index
@@ -431,7 +459,8 @@ public class AsyncIndexUpdate implements
}
updatePostRunStatus = true;
}
- mergeWithConcurrencyCheck(builder, beforeCheckpoint, callback.lease);
+ mergeWithConcurrencyCheck(store, builder, beforeCheckpoint,
+ callback.lease, name);
if (indexUpdate.isReindexingPerformed()) {
log.info("[{}] Reindexing completed for indexes: {} in {}",
name, indexUpdate.getReindexStats(), watch);
@@ -454,9 +483,17 @@ public class AsyncIndexUpdate implements
return updatePostRunStatus;
}
- private void mergeWithConcurrencyCheck(
- NodeBuilder builder, final String checkpoint, final long lease)
- throws CommitFailedException {
+ private static String leasify(String name) {
+ return name + "-lease";
+ }
+
+ private static String getTempCpName(String name) {
+ return name + "-temp";
+ }
+
+ private static void mergeWithConcurrencyCheck(final NodeStore store,
+ NodeBuilder builder, final String checkpoint, final long lease,
+ final String name) throws CommitFailedException {
CommitHook concurrentUpdateCheck = new CommitHook() {
@Override @Nonnull
public NodeState processCommit(
@@ -465,7 +502,7 @@ public class AsyncIndexUpdate implements
// check for concurrent updates by this async task
NodeState async = before.getChildNode(ASYNC);
if (checkpoint == null || Objects.equal(checkpoint, async.getString(name))
- && lease == async.getLong(name + "-lease")) {
+ && lease == async.getLong(leasify(name))) {
return after;
} else {
throw CONCURRENT_UPDATE;
@@ -488,6 +525,14 @@ public class AsyncIndexUpdate implements
}
}
+ /**
+ * Milliseconds for the timeout
+ */
+ protected AsyncIndexUpdate setLeaseTimeOut(long leaseTimeOut) {
+ this.leaseTimeOut = leaseTimeOut;
+ return this;
+ }
+
private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
stats.start(now());
}
@@ -919,6 +964,7 @@ public class AsyncIndexUpdate implements
private void split(@CheckForNull String refCheckpoint, long lease) throws CommitFailedException {
NodeBuilder builder = store.getRoot().builder();
if (refCheckpoint != null) {
+ String tempCpName = getTempCpName(name);
NodeBuilder async = builder.child(ASYNC);
// add new reference
async.setProperty(newIndexTaskName, refCheckpoint);
@@ -948,7 +994,7 @@ public class AsyncIndexUpdate implements
}
if (!updated.isEmpty()) {
- mergeWithConcurrencyCheck(builder, refCheckpoint, lease);
+ mergeWithConcurrencyCheck(store, builder, refCheckpoint, lease, name);
log.info(
"[{}] Successfully split index definitions {} to async task named {} with referenced checkpoint {}.",
name, updated, newIndexTaskName, refCheckpoint);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java?rev=1718533&r1=1718532&r2=1718533&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/package-info.java Tue Dec 8 08:44:18 2015
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("3.0.0")
+@Version("3.1.0")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.plugins.index;
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1718533&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Tue Dec 8 08:44:18 2015
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.oak.NodeStoreFixture;
+import org.apache.jackrabbit.oak.OakBaseTest;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class AsyncIndexUpdateLeaseTest extends OakBaseTest {
+
+ private final String name = "async";
+ private MemoryNodeStore store;
+ private IndexEditorProvider provider;
+
+ private final AtomicBoolean executed = new AtomicBoolean(false);
+
+ public AsyncIndexUpdateLeaseTest(NodeStoreFixture fixture) {
+ super(fixture);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ store = new MemoryNodeStore();
+ provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, name);
+ builder.child("testRoot").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ executed.set(false);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ assertTrue("Test method was not executed", executed.get());
+ String referenced = getReferenceCp(store, name);
+ assertNotNull("Reference checkpoint doesn't exist", referenced);
+ assertNotNull(
+ "Failed indexer must not clean successful indexer's checkpoint",
+ store.retrieve(referenced));
+ }
+
+ @Test
+ @Ignore("OAK-3436")
+ public void testPrePrepare() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void prePrepare() {
+ executed.set(true);
+ assertRunOk(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPostPrepare() {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postPrepare() {
+ executed.set(true);
+ // lease must prevent this run
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPreIndexUpdate() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ testContent(store);
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void preIndexUpdate() {
+ executed.set(true);
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPostIndexUpdate() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ testContent(store);
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postIndexUpdate() {
+ executed.set(true);
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPreClose() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ testContent(store);
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void preClose() {
+ executed.set(true);
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPostPrepareLeaseExpired() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+ final long lease = 50;
+
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postPrepare() {
+ executed.set(true);
+ try {
+ TimeUnit.MILLISECONDS.sleep(lease * 3);
+ } catch (InterruptedException e) {
+ //
+ }
+ assertRunOk(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+ .setLeaseTimeOut(lease));
+ }
+
+ @Test
+ public void testPreIndexUpdateLeaseExpired() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ // add extra indexed content
+ testContent(store);
+
+ final long lease = 50;
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void preIndexUpdate() {
+ executed.set(true);
+ try {
+ TimeUnit.MILLISECONDS.sleep(lease * 3);
+ } catch (InterruptedException e) {
+ //
+ }
+ assertRunOk(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+ .setLeaseTimeOut(lease));
+ }
+
+ @Test
+ public void testPostIndexUpdateLeaseExpired() throws Exception {
+ // take care of initial reindex before
+ new AsyncIndexUpdate(name, store, provider).run();
+
+ // add extra indexed content
+ testContent(store);
+
+ final long lease = 50;
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postIndexUpdate() {
+ executed.set(true);
+ try {
+ TimeUnit.MILLISECONDS.sleep(lease * 3);
+ } catch (InterruptedException e) {
+ //
+ }
+ assertRunOk(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)
+ .setLeaseTimeOut(lease));
+ }
+
+ @Test
+ public void testPrePrepareRexindex() throws Exception {
+
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void prePrepare() {
+ executed.set(true);
+ assertRunOk(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPostPrepareReindex() {
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postPrepare() {
+ executed.set(true);
+ // lease must prevent this run
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPreIndexUpdateReindex() throws Exception {
+ testContent(store);
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void preIndexUpdate() {
+ executed.set(true);
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ @Test
+ public void testPostIndexUpdateReindex() throws Exception {
+ testContent(store);
+ final IndexStatusListener l1 = new IndexStatusListener() {
+
+ @Override
+ protected void postIndexUpdate() {
+ executed.set(true);
+ assertRunKo(new AsyncIndexUpdate(name, store, provider));
+ }
+ };
+ assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1));
+ }
+
+ // -------------------------------------------------------------------
+
+ private static String getReferenceCp(NodeStore store, String name) {
+ return store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ .getString(name);
+ }
+
+ private void assertRunOk(AsyncIndexUpdate a) {
+ assertRun(a, false);
+ }
+
+ private void assertRunKo(AsyncIndexUpdate a) {
+ assertRun(a, true);
+ assertConcurrentUpdate(a.getIndexStats());
+ }
+
+ private void assertRun(AsyncIndexUpdate a, boolean status) {
+ a.run();
+ assertEquals("Unexpected failiure flag", status, a.isFailing());
+ }
+
+ private void assertConcurrentUpdate(AsyncIndexStats stats) {
+ assertTrue("Error must be of type 'Concurrent update'", stats
+ .getLatestError().contains("Concurrent update detected"));
+ }
+
+ private static void testContent(NodeStore store) throws Exception {
+ NodeBuilder builder = store.getRoot().builder();
+ builder.child("testRoot").setProperty("foo",
+ "abc " + System.currentTimeMillis());
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ }
+
+ private static class SpecialAsyncIndexUpdate extends AsyncIndexUpdate {
+
+ private final IndexStatusListener listener;
+
+ public SpecialAsyncIndexUpdate(String name, NodeStore store,
+ IndexEditorProvider provider, IndexStatusListener listener) {
+ super(name, store, provider);
+ this.listener = listener;
+ }
+
+ @Override
+ public synchronized void run() {
+ super.run();
+ }
+
+ @Override
+ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
+ String name, long leaseTimeOut, String checkpoint,
+ String afterCheckpoint, AsyncIndexStats indexStats) {
+ return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
+ checkpoint, afterCheckpoint, indexStats, listener);
+ }
+ }
+
+ private static class SpecialAsyncUpdateCallback extends AsyncUpdateCallback {
+
+ private IndexStatusListener listener;
+
+ public SpecialAsyncUpdateCallback(NodeStore store, String name,
+ long leaseTimeOut, String checkpoint, String afterCheckpoint,
+ AsyncIndexStats indexStats, IndexStatusListener listener) {
+ super(store, name, leaseTimeOut, checkpoint, afterCheckpoint,
+ indexStats);
+ this.listener = listener;
+ }
+
+ @Override
+ protected void prepare() throws CommitFailedException {
+ listener.prePrepare();
+ super.prepare();
+ listener.postPrepare();
+ }
+
+ @Override
+ public void indexUpdate() throws CommitFailedException {
+ listener.preIndexUpdate();
+ super.indexUpdate();
+ listener.postIndexUpdate();
+ }
+
+ @Override
+ void close() throws CommitFailedException {
+ listener.preClose();
+ super.close();
+ listener.postClose();
+ }
+
+ }
+
+ private abstract static class IndexStatusListener {
+
+ protected void prePrepare() {
+ }
+
+ protected void postPrepare() {
+ }
+
+ protected void preIndexUpdate() {
+ }
+
+ protected void postIndexUpdate() {
+ }
+
+ protected void preClose() {
+ }
+
+ protected void postClose() {
+ }
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain