You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2016/08/16 09:20:32 UTC
svn commit: r1756505 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Author: alexparvulescu
Date: Tue Aug 16 09:20:32 2016
New Revision: 1756505
URL: http://svn.apache.org/viewvc?rev=1756505&view=rev
Log:
OAK-4668 Make async index more resilient on documentmk
- oak-core fix
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1756505&r1=1756504&r2=1756505&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Tue Aug 16 09:20:32 2016
@@ -201,9 +201,7 @@ public class AsyncIndexUpdate implements
private final NodeStore store;
/** The base checkpoint */
- private final String checkpoint;
-
- private final String afterCheckpoint;
+ private String checkpoint;
/**
* Property name which stores the temporary checkpoint that need to be released on the next run
@@ -223,21 +221,25 @@ public class AsyncIndexUpdate implements
/** Expiration time of the last lease we committed */
private long lease;
+ private boolean hasLease = false;
+
public AsyncUpdateCallback(NodeStore store, String name,
- long leaseTimeOut, String checkpoint, String afterCheckpoint,
+ long leaseTimeOut, String checkpoint,
AsyncIndexStats indexStats, AtomicBoolean forcedStop) {
this.store = store;
this.name = name;
this.forcedStop = forcedStop;
this.leaseTimeOut = leaseTimeOut;
this.checkpoint = checkpoint;
- this.afterCheckpoint = afterCheckpoint;
this.tempCpName = getTempCpName(name);
this.indexStats = indexStats;
this.leaseName = leasify(name);
}
- protected void prepare() throws CommitFailedException {
+ protected void initLease() throws CommitFailedException {
+ if (hasLease) {
+ return;
+ }
long now = System.currentTimeMillis();
this.lease = now + 2 * leaseTimeOut;
@@ -251,6 +253,18 @@ public class AsyncIndexUpdate implements
NodeBuilder async = builder.child(ASYNC);
async.setProperty(leaseName, lease);
mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name);
+ hasLease = true;
+ }
+
+ protected void prepare(String afterCheckpoint)
+ throws CommitFailedException {
+ if (!hasLease) {
+ initLease();
+ }
+ NodeState root = store.getRoot();
+ NodeBuilder builder = root.builder();
+ NodeBuilder async = builder.child(ASYNC);
+
updateTempCheckpoints(async, checkpoint, afterCheckpoint);
mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name);
@@ -311,6 +325,10 @@ public class AsyncIndexUpdate implements
}
}
}
+
+ public void setCheckpoint(String checkpoint) {
+ this.checkpoint = checkpoint;
+ }
}
@Override
@@ -390,13 +408,34 @@ public class AsyncIndexUpdate implements
// find the last indexed state, and check if there are recent changes
NodeState before;
String beforeCheckpoint = async.getString(name);
+ AsyncUpdateCallback callback = newAsyncUpdateCallback(store,
+ name, leaseTimeOut, beforeCheckpoint, indexStats,
+ forcedStopFlag);
if (beforeCheckpoint != null) {
NodeState state = store.retrieve(beforeCheckpoint);
if (state == null) {
+ // to make sure we're not reading a stale root rev, we're
+ // attempting a write+read via the lease-grab mechanics
+ try {
+ callback.initLease();
+ } catch (CommitFailedException e) {
+ indexStats.failed(e);
+ return;
+ }
+ root = store.getRoot();
+ beforeCheckpoint = root.getChildNode(ASYNC).getString(name);
+ if (beforeCheckpoint != null) {
+ state = store.retrieve(beforeCheckpoint);
+ callback.setCheckpoint(beforeCheckpoint);
+ }
+ }
+
+ if (state == null) {
log.warn(
"[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
name, beforeCheckpoint);
beforeCheckpoint = null;
+ callback.setCheckpoint(beforeCheckpoint);
before = MISSING_NODE;
} else if (noVisibleChanges(state, root)) {
log.debug(
@@ -436,8 +475,8 @@ public class AsyncIndexUpdate implements
log.trace("Switching thread name to {}", newThreadName);
threadNameChanged = true;
Thread.currentThread().setName(newThreadName);
- updatePostRunStatus = updateIndex(before, beforeCheckpoint,
- after, afterCheckpoint, afterTime);
+ updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
+ afterCheckpoint, afterTime, callback);
// the update succeeded, i.e. it no longer fails
if (indexStats.isFailing()) {
@@ -479,23 +518,21 @@ public class AsyncIndexUpdate implements
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
String name, long leaseTimeOut, String beforeCheckpoint,
- String afterCheckpoint, AsyncIndexStats indexStats,
+ AsyncIndexStats indexStats,
AtomicBoolean stopFlag) {
return new AsyncUpdateCallback(store, name, leaseTimeOut,
- beforeCheckpoint, afterCheckpoint, indexStats, stopFlag);
+ beforeCheckpoint, indexStats, stopFlag);
}
- private boolean updateIndex(NodeState before, String beforeCheckpoint,
- NodeState after, String afterCheckpoint, String afterTime)
- throws CommitFailedException {
+ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
+ NodeState after, String afterCheckpoint, String afterTime,
+ AsyncUpdateCallback callback) throws CommitFailedException {
Stopwatch watch = Stopwatch.createStarted();
boolean updatePostRunStatus = true;
boolean progressLogged = false;
- // create an update callback for tracking index updates
+ // prepare the update callback for tracking index updates
// and maintaining the update lease
- AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
- leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, forcedStopFlag);
- callback.prepare();
+ callback.prepare(afterCheckpoint);
// check for index tasks split requests, if a split happened, make
// sure to not delete the reference checkpoint, as the other index
@@ -567,7 +604,7 @@ public class AsyncIndexUpdate implements
return updatePostRunStatus;
}
- private static String leasify(String name) {
+ static String leasify(String name) {
return name + "-lease";
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1756505&r1=1756504&r2=1756505&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Tue Aug 16 09:20:32 2016
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.plugins.index;
import static com.google.common.collect.Sets.newHashSet;
+import static org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.ASYNC;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
@@ -27,6 +28,7 @@ import static org.hamcrest.CoreMatchers.
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -41,12 +43,14 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.management.openmbean.CompositeData;
+import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
@@ -62,6 +66,7 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Editor;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.query.PropertyValues;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
@@ -442,7 +447,7 @@ public class AsyncIndexUpdateTest {
builder = store.getRoot().builder();
// change cp ref to point to a non-existing one
- builder.child(AsyncIndexUpdate.ASYNC).setProperty("async", "faulty");
+ builder.child(ASYNC).setProperty("async", "faulty");
builder.child("testAnother").setProperty("foo", "def");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
@@ -466,7 +471,7 @@ public class AsyncIndexUpdateTest {
Set<String> checkpoints = newHashSet(store.listCheckpoints());
assertTrue("Expecting the initial checkpoint",
checkpoints.size() == 1);
- assertEquals(store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ assertEquals(store.getRoot().getChildNode(ASYNC)
.getString("async"), checkpoints.iterator().next());
async.run();
@@ -507,7 +512,7 @@ public class AsyncIndexUpdateTest {
secondCp.equals(firstCp));
assertEquals(
secondCp,
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getString("async"));
}
@@ -545,7 +550,7 @@ public class AsyncIndexUpdateTest {
secondCp.equals(firstCp));
assertEquals(
secondCp,
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getString("async"));
}
@@ -631,7 +636,7 @@ public class AsyncIndexUpdateTest {
assertTrue(
"Expecting one temp checkpoint",
newHashSet(
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getStrings("async-temp")).size() == 1);
builder = store.getRoot().builder();
@@ -644,7 +649,7 @@ public class AsyncIndexUpdateTest {
assertTrue(
"Expecting two temp checkpoints",
newHashSet(
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getStrings("async-temp")).size() == 2);
canRelease.set(true);
@@ -659,11 +664,11 @@ public class AsyncIndexUpdateTest {
String secondCp = mns.listCheckpoints().iterator().next();
assertEquals(
secondCp,
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getString("async"));
// the temp cps size is 2 now but the unreferenced checkpoints have been
// cleared from the store already
- for (String cp : store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ for (String cp : store.getRoot().getChildNode(ASYNC)
.getStrings("async-temp")) {
if (cp.equals(secondCp)) {
continue;
@@ -702,7 +707,7 @@ public class AsyncIndexUpdateTest {
String firstCp = store.listCheckpoints().iterator().next();
assertEquals(
firstCp,
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getString("asyncMissing"));
// second run, simulate an index going away
@@ -724,7 +729,7 @@ public class AsyncIndexUpdateTest {
secondCp.equals(firstCp));
assertEquals(
firstCp,
- store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC)
+ store.getRoot().getChildNode(ASYNC)
.getString("asyncMissing"));
}
@@ -847,7 +852,7 @@ public class AsyncIndexUpdateTest {
String secondCp = checkpoints.iterator().next();
NodeState asyncNode = store.getRoot().getChildNode(
- AsyncIndexUpdate.ASYNC);
+ ASYNC);
assertEquals(firstCp, asyncNode.getString("async-slow"));
assertEquals(secondCp, asyncNode.getString("async"));
assertFalse(newHashSet(asyncNode.getStrings("async-temp")).contains(
@@ -914,7 +919,7 @@ public class AsyncIndexUpdateTest {
String secondCp = checkpoints.iterator().next();
NodeState asyncNode = store.getRoot().getChildNode(
- AsyncIndexUpdate.ASYNC);
+ ASYNC);
assertEquals(secondCp, asyncNode.getString("async"));
assertNull(firstCp, asyncNode.getString("async-slow"));
@@ -1067,13 +1072,13 @@ public class AsyncIndexUpdateTest {
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
- String beforeCheckpoint, String afterCheckpoint,
+ String beforeCheckpoint,
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
try {
asyncLock.acquire();
} catch (InterruptedException ignore) {
}
- return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+ return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
indexStats, stopFlag);
}
};
@@ -1137,13 +1142,13 @@ public class AsyncIndexUpdateTest {
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
- String beforeCheckpoint, String afterCheckpoint,
+ String beforeCheckpoint,
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
try {
asyncLock.acquire();
} catch (InterruptedException ignore) {
}
- return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+ return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
indexStats, stopFlag);
}
};
@@ -1211,9 +1216,9 @@ public class AsyncIndexUpdateTest {
final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
@Override
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
- String beforeCheckpoint, String afterCheckpoint,
+ String beforeCheckpoint,
AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
- return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+ return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint,
indexStats, stopFlag){
@Override
@@ -1310,4 +1315,144 @@ public class AsyncIndexUpdateTest {
fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: ");
}
+ @Test
+ public void greedyLeaseReindex() throws Exception {
+
+ MemoryNodeStore store = new MemoryNodeStore();
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider);
+ pre.run();
+ pre.close();
+
+ // rm all cps to simulate 'missing cp scenario'
+ for (String cp : store.listCheckpoints()) {
+ store.release(cp);
+ }
+
+ final AtomicBoolean greedyLease = new AtomicBoolean(false);
+ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
+ provider) {
+ @Override
+ protected AsyncUpdateCallback newAsyncUpdateCallback(
+ NodeStore store, String name, long leaseTimeOut,
+ String beforeCheckpoint, AsyncIndexStats indexStats,
+ AtomicBoolean stopFlag) {
+ return new AsyncUpdateCallback(store, name, leaseTimeOut,
+ beforeCheckpoint, indexStats, stopFlag) {
+
+ @Override
+ protected void initLease() throws CommitFailedException {
+ greedyLease.set(true);
+ super.initLease();
+ }
+
+ @Override
+ protected void prepare(String afterCheckpoint)
+ throws CommitFailedException {
+ assertTrue(greedyLease.get());
+ super.prepare(afterCheckpoint);
+ }
+ };
+ }
+ };
+ async.run();
+ async.close();
+ assertTrue(greedyLease.get());
+ }
+
+ @Test
+ public void checkpointLostEventualConsistent() throws Exception {
+
+ MemoryNodeStore store = new MemoryNodeStore();
+ final List<NodeState> rootStates = Lists.newArrayList();
+ store.addObserver(new Observer() {
+ @Override
+ public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+ rootStates.add(root);
+ }
+ });
+
+ IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ NodeBuilder builder = store.getRoot().builder();
+ createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+ "rootIndex", true, false, ImmutableSet.of("foo"), null)
+ .setProperty(ASYNC_PROPERTY_NAME, "async");
+ builder.child("testRoot").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider);
+ pre.run();
+
+ //Create another commit so that we have two checkpoints
+ builder = store.getRoot().builder();
+ builder.child("testRoot2").setProperty("foo", "abc");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ pre.run();
+
+ pre.close();
+
+ //Look for the nodestate just before the final merge in AsyncIndexUpdate
+ //i.e. where older checkpoint was still referred and which has been "released"
+ //post last run
+ Collections.reverse(rootStates);
+ final AtomicReference<NodeState> oldRootState = new AtomicReference<NodeState>();
+ for (NodeState ns : rootStates) {
+ NodeState async = ns.getChildNode(ASYNC);
+ String checkpointName = async.getString("async");
+ if (store.retrieve(checkpointName) == null &&
+ async.getProperty(AsyncIndexUpdate.leasify("async")) == null){
+ oldRootState.set(ns);
+ break;
+ }
+ }
+
+ assertNotNull(oldRootState.get());
+
+ final AtomicBoolean intiLeaseCalled = new AtomicBoolean(false);
+ //Here for the call to read existing NodeState we would return the old
+ //"stale" state where we have a stale checkpoint
+ store = new MemoryNodeStore(store.getRoot()) {
+ @Override
+ public NodeState getRoot() {
+ //Keep returning stale view untill initlease is not invoked
+ if (!intiLeaseCalled.get()) {
+ return oldRootState.get();
+ }
+ return super.getRoot();
+ }
+ };
+
+ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider){
+ @Override
+ protected AsyncUpdateCallback newAsyncUpdateCallback(
+ NodeStore store, String name, long leaseTimeOut,
+ String beforeCheckpoint, AsyncIndexStats indexStats,
+ AtomicBoolean stopFlag) {
+ return new AsyncUpdateCallback(store, name, leaseTimeOut,
+ beforeCheckpoint, indexStats, stopFlag) {
+
+ @Override
+ protected void initLease() throws CommitFailedException {
+ intiLeaseCalled.set(true);
+ super.initLease();
+ }
+ };
+ }
+ };
+ async.run();
+
+ //This run should fail
+ assertTrue(async.getIndexStats().isFailing());
+ async.close();
+ }
+
+
+
}