You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/06/16 07:27:04 UTC
svn commit: r1602798 - in /jackrabbit/oak/branches/1.0: ./
oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/
oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/
Author: jukka
Date: Mon Jun 16 05:27:04 2014
New Revision: 1602798
URL: http://svn.apache.org/r1602798
Log:
1.0.1: Merged revisions 1602796 and 1602797 (OAK-1877)
Modified:
jackrabbit/oak/branches/1.0/ (props changed)
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
Merged /jackrabbit/oak/trunk:r1602796-1602797
Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1602798&r1=1602797&r2=1602798&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Mon Jun 16 05:27:04 2014
@@ -21,11 +21,9 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
-import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
import java.util.Calendar;
import java.util.HashSet;
@@ -36,18 +34,15 @@ import javax.annotation.Nonnull;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
-import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
-import org.apache.jackrabbit.oak.plugins.value.Conversions;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
import org.apache.jackrabbit.oak.spi.commit.EditorHook;
-import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
@@ -75,8 +70,9 @@ public class AsyncIndexUpdate implements
"Async", 1, "Concurrent update detected");
/**
- * Timeout in minutes after which an async job would be considered as timed out. Another
- * node in cluster would wait for timeout before taking over a running job
+ * Timeout in minutes after which an async job would be considered as
+ * timed out. Another node in cluster would wait for timeout before
+ * taking over a running job
*/
private static final int ASYNC_TIMEOUT = 15;
@@ -125,13 +121,55 @@ public class AsyncIndexUpdate implements
*/
private class AsyncUpdateCallback implements IndexUpdateCallback {
- private boolean dirty = false;
+ /** The base checkpoint */
+ private final String checkpoint;
+
+ /** Expiration time of the last lease we committed */
+ private long lease;
+
+ private long updates = 0;
+
+ public AsyncUpdateCallback(String checkpoint)
+ throws CommitFailedException {
+ long now = System.currentTimeMillis();
+ this.checkpoint = checkpoint;
+ this.lease = now + 2 * ASYNC_TIMEOUT;
+
+ String leaseName = name + "-lease";
+ NodeState root = store.getRoot();
+ long beforeLease = root.getChildNode(ASYNC).getLong(leaseName);
+ if (beforeLease > now) {
+ throw CONCURRENT_UPDATE;
+ }
+
+ NodeBuilder builder = root.builder();
+ builder.child(ASYNC).setProperty(leaseName, lease);
+ mergeWithConcurrencyCheck(builder, checkpoint, beforeLease);
+ }
+
+ boolean isDirty() {
+ return updates > 0;
+ }
+
+ void close() throws CommitFailedException {
+ NodeBuilder builder = store.getRoot().builder();
+ NodeBuilder async = builder.child(ASYNC);
+ async.removeProperty(name + "-lease");
+ mergeWithConcurrencyCheck(builder, async.getString(name), lease);
+ }
@Override
public void indexUpdate() throws CommitFailedException {
- if (!dirty) {
- dirty = true;
- preAsyncRun(store, name);
+ updates++;
+ if (updates % 100 == 0) {
+ long now = System.currentTimeMillis();
+ if (now + ASYNC_TIMEOUT > lease) {
+ long newLease = now + 2 * ASYNC_TIMEOUT;
+ NodeBuilder builder = store.getRoot().builder();
+ builder.child(ASYNC).setProperty(name + "-lease", newLease);
+ mergeWithConcurrencyCheck(builder, checkpoint, lease);
+ lease = newLease;
+ }
}
}
@@ -141,26 +179,33 @@ public class AsyncIndexUpdate implements
public synchronized void run() {
log.debug("Running background index task {}", name);
- if (isAlreadyRunning(store, name)) {
- log.debug("The {} indexer is already running; skipping this update", name);
+ NodeState root = store.getRoot();
+
+ // check for concurrent updates
+ NodeState async = root.getChildNode(ASYNC);
+ if (async.getLong(name + "-lease") > System.currentTimeMillis()) {
+ log.debug("Another copy of the {} index update is already running;"
+ + " skipping this update", name);
return;
}
+ // find the last indexed state, and check if there are recent changes
NodeState before;
- NodeState root = store.getRoot();
- String refCheckpoint = root.getChildNode(ASYNC).getString(name);
- if (refCheckpoint != null) {
- NodeState state = store.retrieve(refCheckpoint);
+ String beforeCheckpoint = async.getString(name);
+ if (beforeCheckpoint != null) {
+ NodeState state = store.retrieve(beforeCheckpoint);
if (state == null) {
log.warn("Failed to retrieve previously indexed checkpoint {};"
- + " rerunning the initial {} index update",
- refCheckpoint, name);
+ + " re-running the initial {} index update",
+ beforeCheckpoint, name);
+ beforeCheckpoint = null;
before = MISSING_NODE;
} else if (noVisibleChanges(state, root)) {
log.debug("No changes since last checkpoint;"
+ " skipping the {} index update", name);
return;
} else {
+ System.err.println("Some visible changes detected");
before = state;
}
} else {
@@ -168,35 +213,74 @@ public class AsyncIndexUpdate implements
before = MISSING_NODE;
}
- String checkpoint = store.checkpoint(lifetime);
- NodeState after = store.retrieve(checkpoint);
+ // there are some recent changes, so let's create a new checkpoint
+ String afterCheckpoint = store.checkpoint(lifetime);
+ NodeState after = store.retrieve(afterCheckpoint);
if (after == null) {
log.warn("Unable to retrieve newly created checkpoint {},"
- + " skipping the {} index update", checkpoint, name);
+ + " skipping the {} index update", afterCheckpoint, name);
return;
}
- NodeBuilder builder = store.getRoot().builder();
- NodeBuilder async = builder.child(ASYNC);
+ String checkpointToRelease = afterCheckpoint;
+ try {
+ updateIndex(before, beforeCheckpoint, after, afterCheckpoint);
- AsyncUpdateCallback callback = new AsyncUpdateCallback();
+ // the update succeeded, i.e. it no longer fails
+ if (failing) {
+ log.info("Index update {} no longer fails", name);
+ failing = false;
+ }
+
+ // the update succeeded, so we can release the earlier checkpoint
+ // otherwise the new checkpoint associated with the failed update
+ // will get released in the finally block
+ checkpointToRelease = beforeCheckpoint;
+
+ } catch (CommitFailedException e) {
+ if (e == CONCURRENT_UPDATE) {
+ log.debug("Concurrent update detected in the {} index update", name);
+ } else if (failing) {
+ log.debug("The {} index update is still failing", name, e);
+ } else {
+ log.warn("The {} index update failed", name, e);
+ failing = true;
+ }
+
+ } finally {
+ if (checkpointToRelease != null) { // null during initial indexing
+ store.release(checkpointToRelease);
+ }
+ }
+ }
+
+ private void updateIndex(
+ NodeState before, String beforeCheckpoint,
+ NodeState after, String afterCheckpoint)
+ throws CommitFailedException {
+ // start collecting runtime statistics
preAsyncRunStatsStats(indexStats);
- IndexUpdate indexUpdate = new IndexUpdate(
- provider, name, after, builder, callback);
- CommitFailedException exception = EditorDiff.process(
- indexUpdate, before, after);
- if (exception == null) {
- if (callback.dirty) {
- async.setProperty(name, checkpoint);
- try {
- store.merge(builder, newCommitHook(name, refCheckpoint),
- CommitInfo.EMPTY);
- } catch (CommitFailedException e) {
- if (e != CONCURRENT_UPDATE) {
- exception = e;
- }
- }
+ // create an update callback for tracking index updates
+ // and maintaining the update lease
+ AsyncUpdateCallback callback =
+ new AsyncUpdateCallback(beforeCheckpoint);
+ try {
+ NodeBuilder builder = store.getRoot().builder();
+
+ IndexUpdate indexUpdate =
+ new IndexUpdate(provider, name, after, builder, callback);
+ CommitFailedException exception =
+ EditorDiff.process(indexUpdate, before, after);
+ if (exception != null) {
+ throw exception;
+ }
+
+ if (callback.isDirty() || before == MISSING_NODE) {
+ builder.child(ASYNC).setProperty(name, afterCheckpoint);
+ mergeWithConcurrencyCheck(
+ builder, beforeCheckpoint, callback.lease);
+
if (switchOnSync) {
reindexedDefinitions.addAll(
indexUpdate.getReindexedDefinitions());
@@ -204,9 +288,9 @@ public class AsyncIndexUpdate implements
postAsyncRunStatsStatus(indexStats);
}
} else if (switchOnSync) {
- log.debug("No changes detected after diff, will try to switch to synchronous updates on "
- + reindexedDefinitions);
- async.setProperty(name, checkpoint);
+ log.debug("No changes detected after diff; will try to"
+ + " switch to synchronous updates on {}",
+ reindexedDefinitions);
// no changes after diff, switch to sync on the async defs
for (String path : reindexedDefinitions) {
@@ -219,126 +303,47 @@ public class AsyncIndexUpdate implements
}
}
- try {
- store.merge(builder, newCommitHook(name, refCheckpoint),
- CommitInfo.EMPTY);
- reindexedDefinitions.clear();
- postAsyncRunStatsStatus(indexStats);
- } catch (CommitFailedException e) {
- if (e != CONCURRENT_UPDATE) {
- exception = e;
- }
- }
- }
- }
-
- // checkpoints cleanup
- if (exception != null || (exception == null && !callback.dirty)) {
- log.debug("The {} index update failed; releasing the related checkpoint {}",
- name, checkpoint);
- store.release(checkpoint);
- } else {
- if (refCheckpoint != null) {
- log.debug(
- "The {} index update succeeded; releasing the previous checkpoint {}",
- name, refCheckpoint);
- store.release(refCheckpoint);
+ mergeWithConcurrencyCheck(
+ builder, beforeCheckpoint, callback.lease);
+ reindexedDefinitions.clear();
}
+ } finally {
+ callback.close();
}
- if (exception != null) {
- if (!failing) {
- log.warn("Index update {} failed", name, exception);
- }
- failing = true;
- } else {
- if (failing) {
- log.info("Index update {} no longer fails", name);
- }
- failing = false;
- }
+ postAsyncRunStatsStatus(indexStats);
}
- private static CommitHook newCommitHook(
- final String name, final String checkpoint) {
- return new CompositeHook(
- new ConflictHook(new AnnotatingConflictHandler()),
- new EditorHook(new ConflictValidatorProvider()),
- new CommitHook() {
+ private void mergeWithConcurrencyCheck(
+ NodeBuilder builder, final String checkpoint, final long lease)
+ throws CommitFailedException {
+ CommitHook concurrentUpdateCheck = new CommitHook() {
@Override @Nonnull
public NodeState processCommit(
NodeState before, NodeState after, CommitInfo info)
throws CommitFailedException {
// check for concurrent updates by this async task
- String checkpointAfterRebase =
- before.getChildNode(ASYNC).getString(name);
- if (Objects.equal(checkpoint, checkpointAfterRebase)) {
- return postAsyncRunNodeStatus(after.builder(), name)
- .getNodeState();
+ NodeState async = before.getChildNode(ASYNC);
+ if (Objects.equal(checkpoint, async.getString(name))
+ && lease == async.getLong(name + "-lease")) {
+ return after;
} else {
throw CONCURRENT_UPDATE;
}
}
- });
- }
-
- private static void preAsyncRun(NodeStore store, String name) throws CommitFailedException {
- NodeBuilder builder = store.getRoot().builder();
- preAsyncRunNodeStatus(builder, name);
- store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- }
-
- private static boolean isAlreadyRunning(NodeStore store, String name) {
- NodeState indexState = store.getRoot().getChildNode(INDEX_DEFINITIONS_NAME);
-
- //Probably the first run
- if (!indexState.exists()) {
- return false;
- }
-
- //Check if already running or timed out
- if (STATUS_RUNNING.equals(indexState.getString(name + "-status"))) {
- PropertyState startTime = indexState.getProperty(name + "-start");
- Calendar start = Conversions.convert(startTime.getValue(Type.DATE)).toCalendar();
- Calendar now = Calendar.getInstance();
- long delta = now.getTimeInMillis() - start.getTimeInMillis();
-
- //Check if the job has timed out and we need to take over
- if (TimeUnit.MILLISECONDS.toMinutes(delta) > ASYNC_TIMEOUT) {
- log.info("Async job found which stated on {} has timed out in {} minutes. " +
- "This node would take over the job.",
- startTime.getValue(Type.DATE), ASYNC_TIMEOUT);
- return false;
- }
- return true;
- }
-
- return false;
- }
-
- private static void preAsyncRunNodeStatus(NodeBuilder builder, String name) {
- String now = now();
- builder.getChildNode(INDEX_DEFINITIONS_NAME)
- .setProperty(name + "-status", STATUS_RUNNING)
- .setProperty(name + "-start", now, Type.DATE)
- .removeProperty(name + "-done");
+ };
+ CompositeHook hooks = new CompositeHook(
+ new ConflictHook(new AnnotatingConflictHandler()),
+ new EditorHook(new ConflictValidatorProvider()),
+ concurrentUpdateCheck);
+ store.merge(builder, hooks, CommitInfo.EMPTY);
}
private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
stats.start(now());
}
- private static NodeBuilder postAsyncRunNodeStatus(
- NodeBuilder builder, String name) {
- String now = now();
- builder.getChildNode(INDEX_DEFINITIONS_NAME)
- .setProperty(name + "-status", STATUS_DONE)
- .setProperty(name + "-done", now, Type.DATE)
- .removeProperty(name + "-start");
- return builder;
- }
-
- private static void postAsyncRunStatsStatus(AsyncIndexStats stats) {
+ private static void postAsyncRunStatsStatus(AsyncIndexStats stats) {
stats.done(now());
}
@@ -398,6 +403,7 @@ public class AsyncIndexUpdate implements
* Checks whether there are no visible changes between the given states.
*/
private static boolean noVisibleChanges(NodeState before, NodeState after) {
+ System.err.println("Checking for visible changes...");
return after.compareAgainstBaseState(before, new NodeStateDiff() {
@Override
public boolean propertyAdded(PropertyState after) {
@@ -430,6 +436,7 @@ public class AsyncIndexUpdate implements
}
private static boolean isHidden(String name) {
+ System.err.println(" checking " + name);
return name.charAt(0) == ':';
}
Modified: jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java?rev=1602798&r1=1602797&r2=1602798&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java Mon Jun 16 05:27:04 2014
@@ -102,7 +102,7 @@ public class IndexUpdate implements Edit
this.provider = parent.provider;
this.async = parent.async;
this.root = parent.root;
- this.builder = parent.builder.child(checkNotNull(name));
+ this.builder = parent.builder.getChildNode(checkNotNull(name));
this.updateCallback = parent.updateCallback;
}
Modified: jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1602798&r1=1602797&r2=1602798&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/branches/1.0/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Mon Jun 16 05:27:04 2014
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index;
+import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_CONTENT_NODE_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
@@ -50,7 +51,6 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.Editor;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.query.PropertyValues;
-import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.ConflictAnnotatingRebaseDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -371,15 +371,20 @@ public class AsyncIndexUpdateTest {
public void cpCleanupNoChanges() throws Exception {
MemoryNodeStore store = new MemoryNodeStore();
IndexEditorProvider provider = new PropertyIndexEditorProvider();
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
assertTrue("Expecting no checkpoints",
store.listCheckpoints().size() == 0);
// no changes on diff, no checkpoints left behind
- AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
async.run();
- assertTrue("Expecting no checkpoints",
- store.listCheckpoints().size() == 0);
+ Set<String> checkpoints = newHashSet(store.listCheckpoints());
+ assertTrue("Expecting the initial checkpoint",
+ checkpoints.size() == 1);
+
+ async.run();
+ assertEquals("Expecting no checkpoint changes",
+ checkpoints, store.listCheckpoints());
}
@Test