You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/06/09 06:24:01 UTC
svn commit: r1601309 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Author: jukka
Date: Mon Jun 9 04:24:01 2014
New Revision: 1601309
URL: http://svn.apache.org/r1601309
Log:
OAK-1877: Hourly async reindexing on an idle instance
Increase the lifetime of async indexer checkpoints to 3+ years, but
only create them when there actually are some changes in the repository.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1601309&r1=1601308&r2=1601309&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Jun 9 04:24:01 2014
@@ -19,7 +19,6 @@
package org.apache.jackrabbit.oak.plugins.index;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.jackrabbit.oak.api.Type.STRING;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING;
@@ -35,8 +34,6 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
-import com.google.common.base.Objects;
-
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
@@ -53,11 +50,14 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.util.ISO8601;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
+
public class AsyncIndexUpdate implements Runnable {
private static final Logger log = LoggerFactory
@@ -69,7 +69,7 @@ public class AsyncIndexUpdate implements
*/
private static final String ASYNC = ":async";
- private static final long DEFAULT_LIFETIME = TimeUnit.HOURS.toMillis(1);
+ private static final long DEFAULT_LIFETIME = TimeUnit.DAYS.toMillis(1000);
private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
"Async", 1, "Concurrent update detected");
@@ -142,43 +142,55 @@ public class AsyncIndexUpdate implements
log.debug("Running background index task {}", name);
if (isAlreadyRunning(store, name)) {
- log.debug("Async job '{}' found to be already running. Skipping", name);
+ log.debug("The {} indexer is already running; skipping this update", name);
return;
}
+ NodeState before;
+ NodeState root = store.getRoot();
+ String refCheckpoint = root.getChildNode(ASYNC).getString(name);
+ if (refCheckpoint != null) {
+ NodeState state = store.retrieve(refCheckpoint);
+ if (state == null) {
+ log.warn("Failed to retrieve previously indexed checkpoint {};"
+ + " rerunning the initial {} index update",
+ refCheckpoint, name);
+ before = MISSING_NODE;
+ } else if (noVisibleChanges(state, root)) {
+ log.debug("No changes since last checkpoint;"
+ + " skipping the {} index update", name);
+ return;
+ } else {
+ before = state;
+ }
+ } else {
+ log.info("Initial {} index update", name);
+ before = MISSING_NODE;
+ }
+
String checkpoint = store.checkpoint(lifetime);
NodeState after = store.retrieve(checkpoint);
if (after == null) {
- log.debug("Unable to retrieve checkpoint {}", checkpoint);
+ log.warn("Unable to retrieve newly created checkpoint {},"
+ + " skipping the {} index update", checkpoint, name);
return;
}
- NodeBuilder builder = after.builder();
+ NodeBuilder builder = store.getRoot().builder();
NodeBuilder async = builder.child(ASYNC);
- String refCheckpoint = null;
-
- NodeState before = null;
- final PropertyState state = async.getProperty(name);
- if (state != null && state.getType() == STRING) {
- refCheckpoint = state.getValue(STRING);
- before = store.retrieve(refCheckpoint);
- }
- if (before == null) {
- before = MISSING_NODE;
- }
AsyncUpdateCallback callback = new AsyncUpdateCallback();
preAsyncRunStatsStats(indexStats);
- IndexUpdate indexUpdate = new IndexUpdate(provider, name, after,
- builder, callback);
+ IndexUpdate indexUpdate = new IndexUpdate(
+ provider, name, after, builder, callback);
- CommitFailedException exception = EditorDiff.process(indexUpdate,
- before, after);
+ CommitFailedException exception = EditorDiff.process(
+ indexUpdate, before, after);
if (exception == null) {
if (callback.dirty) {
async.setProperty(name, checkpoint);
try {
- store.merge(builder, newCommitHook(name, state),
+ store.merge(builder, newCommitHook(name, refCheckpoint),
CommitInfo.EMPTY);
} catch (CommitFailedException e) {
if (e != CONCURRENT_UPDATE) {
@@ -186,8 +198,8 @@ public class AsyncIndexUpdate implements
}
}
if (switchOnSync) {
- reindexedDefinitions.addAll(indexUpdate
- .getReindexedDefinitions());
+ reindexedDefinitions.addAll(
+ indexUpdate.getReindexedDefinitions());
}
} else if (switchOnSync) {
log.debug("No changes detected after diff, will try to switch to synchronous updates on "
@@ -206,7 +218,7 @@ public class AsyncIndexUpdate implements
}
try {
- store.merge(builder, newCommitHook(name, state),
+ store.merge(builder, newCommitHook(name, refCheckpoint),
CommitInfo.EMPTY);
reindexedDefinitions.clear();
} catch (CommitFailedException e) {
@@ -219,12 +231,14 @@ public class AsyncIndexUpdate implements
postAsyncRunStatsStatus(indexStats);
// checkpoints cleanup
- if (refCheckpoint != null) {
- store.release(refCheckpoint);
- }
if (exception != null || (exception == null && !callback.dirty)) {
- // error? cleanup current cp too
+ log.debug("The {} index update failed; releasing the related checkpoint {}",
+ name, checkpoint);
store.release(checkpoint);
+ } else {
+ log.debug("The {} index update succeeded; releasing the previous checkpoint {}",
+ name, refCheckpoint);
+ store.release(refCheckpoint);
}
if (exception != null) {
@@ -240,20 +254,20 @@ public class AsyncIndexUpdate implements
}
}
- private static CommitHook newCommitHook(final String name,
- final PropertyState state) throws CommitFailedException {
+ private static CommitHook newCommitHook(
+ final String name, final String checkpoint) {
return new CompositeHook(
new ConflictHook(new AnnotatingConflictHandler()),
new EditorHook(new ConflictValidatorProvider()),
new CommitHook() {
- @Override
- @Nonnull
- public NodeState processCommit(NodeState before, NodeState after,
- CommitInfo info) throws CommitFailedException {
+ @Override @Nonnull
+ public NodeState processCommit(
+ NodeState before, NodeState after, CommitInfo info)
+ throws CommitFailedException {
// check for concurrent updates by this async task
- PropertyState stateAfterRebase = before.getChildNode(ASYNC)
- .getProperty(name);
- if (Objects.equal(state, stateAfterRebase)) {
+ String checkpointAfterRebase =
+ before.getChildNode(ASYNC).getString(name);
+ if (Objects.equal(checkpoint, checkpointAfterRebase)) {
return postAsyncRunNodeStatus(after.builder(), name)
.getNodeState();
} else {
@@ -309,8 +323,8 @@ public class AsyncIndexUpdate implements
stats.start(now());
}
- private static NodeBuilder postAsyncRunNodeStatus(NodeBuilder builder,
- String name) {
+ private static NodeBuilder postAsyncRunNodeStatus(
+ NodeBuilder builder, String name) {
String now = now();
builder.getChildNode(INDEX_DEFINITIONS_NAME)
.setProperty(name + "-status", STATUS_DONE)
@@ -375,4 +389,43 @@ public class AsyncIndexUpdate implements
}
}
+ /**
+ * Checks whether there are no visible changes between the given states.
+ */
+ private static boolean noVisibleChanges(NodeState before, NodeState after) {
+ return after.compareAgainstBaseState(before, new NodeStateDiff() {
+ @Override
+ public boolean propertyAdded(PropertyState after) {
+ return isHidden(after.getName());
+ }
+ @Override
+ public boolean propertyChanged(
+ PropertyState before, PropertyState after) {
+ return isHidden(after.getName());
+ }
+ @Override
+ public boolean propertyDeleted(PropertyState before) {
+ return isHidden(before.getName());
+ }
+ @Override
+ public boolean childNodeAdded(String name, NodeState after) {
+ return isHidden(name);
+ }
+ @Override
+ public boolean childNodeChanged(
+ String name, NodeState before, NodeState after) {
+ return isHidden(name)
+ || after.compareAgainstBaseState(before, this);
+ }
+ @Override
+ public boolean childNodeDeleted(String name, NodeState before) {
+ return isHidden(name);
+ }
+ });
+ }
+
+ private static boolean isHidden(String name) {
+ return name.charAt(0) == ':';
+ }
+
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1601309&r1=1601308&r2=1601309&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon Jun 9 04:24:01 2014
@@ -269,6 +269,7 @@ public class AsyncIndexUpdateTest {
async.run();
}
});
+
// drain checkpoint permits
checkpoint.acquireUninterruptibly(checkpoint.availablePermits());
// block NodeStore.retrieve()
@@ -276,6 +277,7 @@ public class AsyncIndexUpdateTest {
t.start();
// wait until async update called checkpoint
+ retrieve.release();
checkpoint.acquireUninterruptibly();
builder = store.getRoot().builder();
builder.child("child").remove();
@@ -334,8 +336,13 @@ public class AsyncIndexUpdateTest {
locks.put(t, s);
t.start();
+ // make some unrelated changes to trigger indexing
+ builder = store.getRoot().builder();
+ builder.setChildNode("dummy").setProperty("foo", "bar");
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
while (!s.hasQueuedThreads()) {
- // busy wait
+ Thread.yield();
}
// introduce a conflict