You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/02 08:12:42 UTC
svn commit: r1583888 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/document/
main/java/org/apache/jackrabbit/oak/plugins/document/mongo/
test/java/org/apache/jackrabbit/oak/plugins/document/
Author: chetanm
Date: Wed Apr 2 06:12:41 2014
New Revision: 1583888
URL: http://svn.apache.org/r1583888
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)
Applying patch from Amit Jain
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (contents, props changed)
- copied, changed from r1583882, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (with props)
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (with props)
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java Wed Apr 2 06:12:41 2014
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
+
import java.lang.management.ManagementFactory;
import java.net.NetworkInterface;
import java.util.ArrayList;
@@ -26,11 +28,10 @@ import java.util.UUID;
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
-
/**
* Information about a cluster node.
*/
@@ -57,7 +58,28 @@ public class ClusterNodeInfo {
/**
* The end of the lease.
*/
- private static final String LEASE_END_KEY = "leaseEnd";
+ protected static final String LEASE_END_KEY = "leaseEnd";
+
+ /**
+ * The state of the cluster.
+ * On proper shutdown the state should be cleared.
+ */
+ protected static final String STATE = "state";
+
+ /**
+ * Flag to indicate whether the _lastRev recovery is in progress.
+ */
+ protected static final String REV_RECOVERY_LOCK = "revLock";
+
+ /**
+ * Active State.
+ */
+ private static final String ACTIVE_STATE = "active";
+
+ /**
+ * _lastRev recovery in progress
+ */
+ protected static final String REV_RECOVERY_ON = "true";
/**
* Additional info, such as the process id, for support.
@@ -85,6 +107,11 @@ public class ClusterNodeInfo {
private static final String WORKING_DIR = System.getProperty("user.dir", "");
/**
+ * <b>Only Used For Testing</b>
+ */
+ private static Clock clock;
+
+ /**
* The number of milliseconds for a lease (1 minute by default, and
* initially).
*/
@@ -130,13 +157,35 @@ public class ClusterNodeInfo {
*/
private String readWriteMode;
+ /**
+ * The state of the cluter node.
+ */
+ private String state;
+
+ /**
+ * The revLock value of the cluster;
+ */
+ private String revRecoveryLock;
+
ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId) {
this.id = id;
- this.startTime = System.currentTimeMillis();
+ this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime());
+ this.leaseEndTime = startTime;
+ this.store = store;
+ this.machineId = machineId;
+ this.instanceId = instanceId;
+ }
+
+ ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, String state,
+ String revRecoveryLock) {
+ this.id = id;
+ this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime());
this.leaseEndTime = startTime;
this.store = store;
this.machineId = machineId;
this.instanceId = instanceId;
+ this.state = state;
+ this.revRecoveryLock = revRecoveryLock;
}
public int getId() {
@@ -144,8 +193,15 @@ public class ClusterNodeInfo {
}
/**
+ * <b>Only Used For Testing</b>
+ */
+ static void setClock(Clock c) {
+ clock = c;
+ }
+
+ /**
* Create a cluster node info instance for the store, with the
- *
+ *
* @param store the document store (for the lease)
* @return the cluster node info
*/
@@ -155,13 +211,14 @@ public class ClusterNodeInfo {
/**
* Create a cluster node info instance for the store.
- *
+ *
* @param store the document store (for the lease)
* @param machineId the machine id (null for MAC address)
* @param instanceId the instance id (null for current working directory)
* @return the cluster node info
*/
- public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, String instanceId) {
+ public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
+ String instanceId) {
if (machineId == null) {
machineId = MACHINE_ID;
}
@@ -174,9 +231,14 @@ public class ClusterNodeInfo {
update.set(ID, String.valueOf(clusterNode.id));
update.set(MACHINE_ID_KEY, clusterNode.machineId);
update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
- update.set(LEASE_END_KEY, System.currentTimeMillis() + clusterNode.leaseTime);
+ update.set(LEASE_END_KEY,
+ (clock == null ? System.currentTimeMillis() : clock.getTime())
+ + clusterNode.leaseTime);
update.set(INFO_KEY, clusterNode.toString());
- boolean success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
+ update.set(STATE, clusterNode.state);
+ update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock);
+ boolean success =
+ store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
if (success) {
return clusterNode;
}
@@ -184,13 +246,15 @@ public class ClusterNodeInfo {
throw new MicroKernelException("Could not get cluster node info");
}
- private static ClusterNodeInfo createInstance(DocumentStore store, String machineId, String instanceId) {
- long now = System.currentTimeMillis();
+ private static ClusterNodeInfo createInstance(DocumentStore store, String machineId,
+ String instanceId) {
+ long now = (clock == null ? System.currentTimeMillis() : clock.getTime());
// keys between "0" and "a" includes all possible numbers
List<ClusterNodeInfoDocument> list = store.query(Collection.CLUSTER_NODES,
"0", "a", Integer.MAX_VALUE);
int clusterNodeId = 0;
int maxId = 0;
+ String state = null;
for (Document doc : list) {
String key = doc.getId();
int id;
@@ -222,28 +286,30 @@ public class ClusterNodeInfo {
if (clusterNodeId == 0 || id < clusterNodeId) {
// if there are multiple, use the smallest value
clusterNodeId = id;
+ state = (String) doc.get(STATE);
}
}
if (clusterNodeId == 0) {
clusterNodeId = maxId + 1;
}
- return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId);
+ return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, null);
}
/**
* Renew the cluster id lease. This method needs to be called once in a while,
* to ensure the same cluster id is not re-used by a different instance.
- *
+ *
* @param nextCheckMillis the millisecond offset
*/
public void renewLease(long nextCheckMillis) {
- long now = System.currentTimeMillis();
+ long now = (clock == null ? System.currentTimeMillis() : clock.getTime());
if (now + nextCheckMillis + nextCheckMillis < leaseEndTime) {
return;
}
UpdateOp update = new UpdateOp("" + id, true);
leaseEndTime = now + leaseTime;
update.set(LEASE_END_KEY, leaseEndTime);
+ update.set(STATE, ACTIVE_STATE);
ClusterNodeInfoDocument doc = store.createOrUpdate(Collection.CLUSTER_NODES, update);
String mode = (String) doc.get(READ_WRITE_MODE_KEY);
if (mode != null && !mode.equals(readWriteMode)) {
@@ -263,6 +329,8 @@ public class ClusterNodeInfo {
public void dispose() {
UpdateOp update = new UpdateOp("" + id, true);
update.set(LEASE_END_KEY, null);
+ update.set(STATE, null);
+ update.set(REV_RECOVERY_LOCK, null);
store.createOrUpdate(Collection.CLUSTER_NODES, update);
}
@@ -273,8 +341,10 @@ public class ClusterNodeInfo {
"machineId: " + machineId + ",\n" +
"instanceId: " + instanceId + ",\n" +
"pid: " + PROCESS_ID + ",\n" +
- "uuid: " + uuid +",\n" +
- "readWriteMode: " + readWriteMode;
+ "uuid: " + uuid + ",\n" +
+ "readWriteMode: " + readWriteMode + ",\n" +
+ "state: " + state + ",\n" +
+ "revLock: " + revRecoveryLock;
}
private static long getProcessId() {
@@ -290,7 +360,7 @@ public class ClusterNodeInfo {
/**
* Calculate the unique machine id. This is the lowest MAC address if
* available. As an alternative, a randomly generated UUID is used.
- *
+ *
* @return the unique id
*/
private static String getMachineId() {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Wed Apr 2 06:12:41 2014
@@ -291,6 +291,8 @@ public final class DocumentNodeStore
private final Executor executor;
+ private final LastRevRecoveryAgent lastRevRecoveryAgent;
+
public DocumentNodeStore(DocumentMK.Builder builder) {
this.blobStore = builder.getBlobStore();
if (builder.isUseSimpleRevision()) {
@@ -322,6 +324,7 @@ public final class DocumentNodeStore
this.branches = new UnmergedBranches(getRevisionComparator());
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(this);
+ this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this);
this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
@Override
public int getMemory() {
@@ -378,11 +381,19 @@ public final class DocumentNodeStore
new BackgroundOperation(this, isDisposed),
"DocumentNodeStore background thread");
backgroundThread.setDaemon(true);
+ checkLastRevRecovery();
backgroundThread.start();
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId);
}
+ /**
+ * Recover _lastRev recovery if needed.
+ */
+ private void checkLastRevRecovery() {
+ lastRevRecoveryAgent.recover(clusterId);
+ }
+
public void dispose() {
runBackgroundOperations();
if (!isDisposed.getAndSet(true)) {
@@ -1707,4 +1718,8 @@ public final class DocumentNodeStore
public VersionGarbageCollector getVersionGarbageCollector() {
return versionGarbageCollector;
}
+ @Nonnull
+ public LastRevRecoveryAgent getLastRevRecoveryAgent() {
+ return lastRevRecoveryAgent;
+ }
}
Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (from r1583882, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java&r1=1583882&r2=1583888&rev=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Apr 2 06:12:41 2014
@@ -19,6 +19,12 @@
package org.apache.jackrabbit.oak.plugins.document;
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.mergeSorted;
+
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
@@ -27,23 +33,80 @@ import javax.annotation.CheckForNull;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
+
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.collect.ImmutableList.of;
-import static com.google.common.collect.Iterables.filter;
-import static com.google.common.collect.Iterables.mergeSorted;
-public class LastRevRecovery {
+/**
+ * Utility class for recovering potential missing _lastRev updates of nodes due to crash of a node.
+ */
+public class LastRevRecoveryAgent {
private final Logger log = LoggerFactory.getLogger(getClass());
private final DocumentNodeStore nodeStore;
- public LastRevRecovery(DocumentNodeStore nodeStore) {
+ private final MissingLastRevSeeker missingLastRevUtil;
+
+ LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
this.nodeStore = nodeStore;
+
+ if (nodeStore.getDocumentStore() instanceof MongoDocumentStore) {
+ this.missingLastRevUtil =
+ new MongoMissingLastRevSeeker((MongoDocumentStore) nodeStore.getDocumentStore());
+ } else {
+ this.missingLastRevUtil = new MissingLastRevSeeker(nodeStore.getDocumentStore());
+ }
}
+ /**
+ * Recover the correct _lastRev updates for potentially missing candidate nodes.
+ *
+ * @param clusterId the cluster id for which the _lastRev are to be recovered
+ * @return the int the number of restored nodes
+ */
+ public int recover(int clusterId) {
+ ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
+
+ if (nodeInfo != null) {
+ Long leaseEnd = (Long) (nodeInfo.get(ClusterNodeInfo.LEASE_END_KEY));
+
+ // Check if _lastRev recovery needed for this cluster node
+ // state == null && recoveryLock not held by someone
+ if (nodeInfo.get(ClusterNodeInfo.STATE) != null
+ && nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_LOCK) == null) {
+
+ // retrieve the root document's _lastRev
+ NodeDocument root = missingLastRevUtil.getRoot();
+ Revision lastRev = root.getLastRev().get(clusterId);
+
+ // start time is the _lastRev timestamp of this cluster node
+ long startTime = lastRev.getTimestamp();
+
+ // Endtime is the leaseEnd + the asyncDelay
+ long endTime = leaseEnd + nodeStore.getAsyncDelay();
+
+ log.info("Recovering candidates modified in time range : {0}",
+ new Object[] {startTime, endTime});
+
+ return recoverCandidates(clusterId, startTime, endTime);
+ }
+ }
+
+ log.info("No recovery needed for clusterId");
+ return 0;
+ }
+
+ /**
+ * Recover the correct _lastRev updates for the given candidate nodes.
+ *
+ * @param suspects the potential suspects
+ * @param clusterId the cluster id for which _lastRev recovery needed
+ * @return the int
+ */
public int recover(Iterator<NodeDocument> suspects, int clusterId) {
UnsavedModifications unsaved = new UnsavedModifications();
UnsavedModifications unsavedParents = new UnsavedModifications();
@@ -87,15 +150,15 @@ public class LastRevRecovery {
}
}
- for(String parentPath : unsavedParents.getPaths()){
+ for (String parentPath : unsavedParents.getPaths()) {
Revision calcLastRev = unsavedParents.get(parentPath);
Revision knownLastRev = knownLastRevs.get(parentPath);
//Copy the calcLastRev of parent only if they have changed
//In many case it might happen that parent have consistent lastRev
//This check ensures that unnecessary updates are not made
- if(knownLastRev == null
- || calcLastRev.compareRevisionTime(knownLastRev) > 0){
+ if (knownLastRev == null
+ || calcLastRev.compareRevisionTime(knownLastRev) > 0) {
unsaved.put(parentPath, calcLastRev);
}
}
@@ -105,7 +168,8 @@ public class LastRevRecovery {
int size = unsaved.getPaths().size();
if (log.isDebugEnabled()) {
- log.debug("Last revision for following documents would be updated {}", unsaved.getPaths());
+ log.debug("Last revision for following documents would be updated {}", unsaved
+ .getPaths());
}
//UnsavedModifications is designed to be used in concurrent
@@ -120,13 +184,47 @@ public class LastRevRecovery {
}
/**
+ * Retrieves possible candidates which have been modifed in the time range and recovers the
+ * missing updates.
+ *
+ * @param clusterId the cluster id
+ * @param startTime the start time
+ * @param endTime the end time
+ * @return the int the number of restored nodes
+ */
+ private int recoverCandidates(final int clusterId, final long startTime, final long endTime) {
+ // take a lock on the update process by setting the value of the lock to true
+ updateRecoveryLockOnCluster(clusterId, ClusterNodeInfo.REV_RECOVERY_ON);
+
+ Iterable<NodeDocument> suspects =
+ missingLastRevUtil.getCandidates(startTime, endTime);
+ if (log.isDebugEnabled()) {
+ log.debug("_lastRev recovery candidates : {}", suspects);
+ }
+
+ try {
+ return recover(suspects.iterator(), clusterId);
+ } finally {
+ if (suspects instanceof Closeable) {
+ try {
+ ((Closeable) suspects).close();
+ } catch (IOException e) {
+ log.error("Error closing iterable : ", e);
+ }
+ }
+ // Relinquish the lock on the recovery for the cluster on the clusterInfo
+ updateRecoveryLockOnCluster(clusterId, null);
+ }
+ }
+
+ /**
* Determines the last revision value which needs to set for given clusterId
* on the passed document. If the last rev entries are consisted
- *
- * @param doc NodeDocument where lastRev entries needs to be fixed
+ *
+ * @param doc NodeDocument where lastRev entries needs to be fixed
* @param clusterId clusterId for which lastRev has to be checked
* @return lastRev which needs to be updated. <tt>null</tt> if no
- * updated is required i.e. lastRev entries are valid
+ * updated is required i.e. lastRev entries are valid
*/
@CheckForNull
private Revision determineMissedLastRev(NodeDocument doc, int clusterId) {
@@ -137,34 +235,46 @@ public class LastRevRecovery {
ClusterPredicate cp = new ClusterPredicate(clusterId);
- //Merge sort the revs for which changes have been made
- //to this doc
+ // Merge sort the revs for which changes have been made
+ // to this doc
- //localMap always keeps the most recent valid commit entry
- //per cluster node so looking into that should be sufficient
+ // localMap always keeps the most recent valid commit entry
+ // per cluster node so looking into that should be sufficient
Iterable<Revision> revs = mergeSorted(of(
- filter(doc.getLocalCommitRoot().keySet(), cp),
- filter(doc.getLocalRevisions().keySet(), cp)),
+ filter(doc.getLocalCommitRoot().keySet(), cp),
+ filter(doc.getLocalRevisions().keySet(), cp)),
StableRevisionComparator.REVERSE
- );
+ );
- //Look for latest valid revision > currentLastRev
- //if found then lastRev needs to be fixed
+ // Look for latest valid revision > currentLastRev
+ // if found then lastRev needs to be fixed
for (Revision rev : revs) {
if (rev.compareRevisionTime(currentLastRev) > 0) {
if (doc.isCommitted(rev)) {
return rev;
}
} else {
- //No valid revision found > currentLastRev
- //indicates that lastRev is valid for given clusterId
- //and no further checks are required
+ // No valid revision found > currentLastRev
+ // indicates that lastRev is valid for given clusterId
+ // and no further checks are required
break;
}
}
return null;
}
+ /**
+ * Set/Unset lock value on the clusterInfo for the clusterId
+ *
+ * @param clusterId for which _lastRev recovery operation performed
+ * @param value the lock value
+ */
+ protected void updateRecoveryLockOnCluster(int clusterId, String value) {
+ UpdateOp update = new UpdateOp("" + clusterId, true);
+ update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, value);
+ nodeStore.getDocumentStore().createOrUpdate(Collection.CLUSTER_NODES, update);
+ }
+
private static class ClusterPredicate implements Predicate<Revision> {
private final int clusterId;
@@ -178,3 +288,4 @@ public class LastRevRecovery {
}
}
}
+
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java?rev=1583888&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java Wed Apr 2 06:12:41 2014
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+
+/**
+ * Utils to retrieve _lastRev missing update candidates.
+ */
+public class MissingLastRevSeeker {
+ protected final String ROOT_PATH = "/";
+ private final DocumentStore store;
+
+ public MissingLastRevSeeker(DocumentStore store) {
+ this.store = store;
+ }
+
+ /**
+ * Gets the cluster node info for the given cluster node id.
+ *
+ * @param clusterId the cluster id
+ * @return the cluster node info
+ */
+ public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
+ // Fetch all documents.
+ List<ClusterNodeInfoDocument> nodes = store.query(Collection.CLUSTER_NODES, "0",
+ "a", Integer.MAX_VALUE);
+ Iterable<ClusterNodeInfoDocument> clusterIterable =
+ Iterables.filter(nodes,
+ new Predicate<ClusterNodeInfoDocument>() {
+ // Return cluster info for the required clusterId
+ @Override
+ public boolean apply(ClusterNodeInfoDocument input) {
+ String id = input.getId();
+ return (id.equals(String.valueOf(clusterId)));
+ }
+ });
+
+ if (clusterIterable.iterator().hasNext()) {
+ return clusterIterable.iterator().next();
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the candidates with modified time between the time range specified.
+ *
+ * @param startTime the start of the time range
+ * @param endTime the end of the time range
+ * @return the candidates
+ */
+ public Iterable<NodeDocument> getCandidates(final long startTime, final long endTime) {
+ // Fetch all documents.
+ List<NodeDocument> nodes = store.query(Collection.NODES, NodeDocument.MIN_ID_VALUE,
+ NodeDocument.MAX_ID_VALUE, Integer.MAX_VALUE);
+ return Iterables.filter(nodes, new Predicate<NodeDocument>() {
+ @Override
+ public boolean apply(NodeDocument input) {
+ Long modified = (Long) input.get(NodeDocument.MODIFIED_IN_SECS);
+ return (modified != null
+ && (modified > TimeUnit.MILLISECONDS.toSeconds(startTime))
+ && (modified < TimeUnit.MILLISECONDS.toSeconds(endTime)));
+ }
+ });
+ }
+
+ public NodeDocument getRoot() {
+ return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
+ }
+}
+
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java Wed Apr 2 06:12:41 2014
@@ -24,6 +24,7 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.jackrabbit.oak.stats.Clock;
/**
* A revision.
*/
@@ -58,6 +59,20 @@ public class Revision {
*/
private final boolean branch;
+ /** Only set for testing */
+ private static Clock clock;
+
+ /**
+ * <b>
+ * Only to be used for testing.
+ * Do Not Use Otherwise
+ * </b>
+ *
+ * @param c - the clock
+ */
+ static void setClock(Clock c) {
+ clock = c;
+ }
public Revision(long timestamp, int counter, int clusterId) {
this(timestamp, counter, clusterId, false);
}
@@ -150,6 +165,9 @@ public class Revision {
*/
public static long getCurrentTimestamp() {
long timestamp = System.currentTimeMillis();
+ if (clock != null) {
+ timestamp = clock.getTime();
+ }
if (timestamp < lastTimestamp) {
// protect against decreases in the system time,
// time machines, and other fluctuations in the time continuum
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java?rev=1583888&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java Wed Apr 2 06:12:41 2014
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import static com.google.common.collect.Iterables.transform;
+
+import com.google.common.base.Function;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.mongodb.ReadPreference;
+
+import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Commit;
+import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+
+/**
+ * Mongo specific version of MissingLastRevSeeker which uses mongo queries
+ * to fetch candidates which may have missed '_lastRev' updates.
+ *
+ * Uses a time range to find documents modified during that interval.
+ */
+public class MongoMissingLastRevSeeker extends MissingLastRevSeeker {
+ private final MongoDocumentStore store;
+
+ public MongoMissingLastRevSeeker(MongoDocumentStore store) {
+ super(store);
+ this.store = store;
+ }
+
+ @Override
+ public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
+ DBObject query =
+ QueryBuilder
+ .start(NodeDocument.ID).is(String.valueOf(clusterId)).get();
+ DBCursor cursor =
+ getClusterNodeCollection().find(query)
+ .setReadPreference(ReadPreference.secondaryPreferred());
+ try {
+ if (cursor.hasNext()) {
+ DBObject obj = cursor.next();
+ return store.convertFromDBObject(Collection.CLUSTER_NODES, obj);
+ }
+ } finally {
+ cursor.close();
+ }
+ return null;
+ }
+
+ @Override
+ public CloseableIterable<NodeDocument> getCandidates(final long startTime,
+ final long endTime) {
+ DBObject query =
+ QueryBuilder
+ .start(NodeDocument.MODIFIED_IN_SECS).lessThanEquals(
+ Commit.getModifiedInSecs(endTime))
+ .put(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals(
+ Commit.getModifiedInSecs(startTime))
+ .get();
+ DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1);
+
+ DBCursor cursor =
+ getNodeCollection().find(query)
+ .sort(sortFields)
+ .setReadPreference(
+ ReadPreference.secondaryPreferred());
+ return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() {
+ @Override
+ public NodeDocument apply(DBObject input) {
+ return store.convertFromDBObject(Collection.NODES, input);
+ }
+ }), cursor);
+ }
+
+ private DBCollection getNodeCollection() {
+ return store.getDBCollection(Collection.NODES);
+ }
+
+ private DBCollection getClusterNodeCollection() {
+ return store.getDBCollection(Collection.CLUSTER_NODES);
+ }
+}
+
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java Wed Apr 2 06:12:41 2014
@@ -134,5 +134,14 @@ public abstract class DocumentStoreFixtu
return false;
}
}
+
+ @Override
+ public void dispose() {
+ try{
+ MongoConnection connection = new MongoConnection(uri);
+ connection.getDB().dropDatabase();
+ } catch(Exception ignore) {
+ }
+ }
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java?rev=1583888&r1=1583887&r2=1583888&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java Wed Apr 2 06:12:41 2014
@@ -90,7 +90,7 @@ public class LastRevRecoveryTest {
//lastRev should not be updated for C #2
assertNull(y1.getLastRev().get(2));
- LastRevRecovery recovery = new LastRevRecovery(ds1);
+ LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1);
//Do not pass y1 but still y1 should be updated
recovery.recover(Iterators.forArray(x1,z1), 2);
Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java?rev=1583888&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java Wed Apr 2 06:12:41 2014
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests the restore of potentially missing _lastRev updates.
+ */
+@RunWith(Parameterized.class)
+public class LastRevSingleNodeRecoveryTest {
+ private DocumentStoreFixture fixture;
+
+ private Clock clock;
+
+ private DocumentMK mk;
+
+ public LastRevSingleNodeRecoveryTest(DocumentStoreFixture fixture) {
+ this.fixture = fixture;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> fixtures() throws IOException {
+ List<Object[]> fixtures = Lists.newArrayList();
+
+ DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture();
+ if (mongo.isAvailable()) {
+ fixtures.add(new Object[] {mongo});
+ }
+ return fixtures;
+ }
+
+ private DocumentMK createMK(int clusterId) throws InterruptedException {
+ clock = new Clock.Virtual();
+ return openMK(clusterId, fixture.createDocumentStore());
+ }
+
+ private DocumentMK openMK(int clusterId, DocumentStore store) throws InterruptedException {
+ clock.waitUntil(System.currentTimeMillis());
+
+ // Sets the clock for testing
+ ClusterNodeInfo.setClock(clock);
+ Revision.setClock(clock);
+
+ DocumentMK.Builder builder = new DocumentMK.Builder();
+ builder.setAsyncDelay(0)
+ .setClusterId(clusterId)
+ .clock(clock)
+ .setDocumentStore(store);
+ mk = builder.open();
+ clock.waitUntil(Revision.getCurrentTimestamp());
+
+ return mk;
+ }
+
+ @Before
+ public void setUp() throws InterruptedException {
+ try {
+ mk = createMK(0);
+ Assume.assumeNotNull(mk);
+
+ // initialize node hierarchy
+ mk.commit("/", "+\"x\" : { \"y\": {\"z\":{} } }", null, null);
+ mk.commit("/", "+\"a\" : { \"b\": {\"c\": {}} }", null, null);
+ } catch (Exception e) {
+ Assume.assumeNoException(e);
+ }
+ }
+
+ @Test
+ public void testLastRevRestoreOnNodeStart() throws Exception {
+ // pending updates
+ setupScenario();
+
+ // renew lease
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10);
+ mk.getClusterInfo().renewLease(0);
+
+ // so that the current time is more than the current lease end
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+ // Recreate mk instance, to simulate fail condition and recovery on start
+ mk = openMK(0, mk.getNodeStore().getDocumentStore());
+
+ int pendingCount = mk.getPendingWriteCount();
+
+ // so that the current time is more than the current lease end
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+ // Immediately check again, now should not have done any changes.
+ LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+ /** Now there should have been pendingCount updates **/
+ assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
+ }
+
+ @Test
+ public void testLastRevRestore() throws Exception {
+ setupScenario();
+
+ int pendingCount = mk.getPendingWriteCount();
+ // so that the current time is more than the current lease end
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000);
+ LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+
+ /** All pending updates should have been restored **/
+ assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
+ }
+
+
+ @Test
+ public void testNoMissingUpdates() throws Exception {
+ setupScenario();
+ mk.backgroundWrite();
+
+ // move the time forward and do another update of the root node so that only 2 nodes are
+ // candidates
+ clock.waitUntil(clock.getTime() + 5000);
+ mk.commit("/", "^\"a/key2\" : \"value2\"", null, null);
+ mk.backgroundWrite();
+
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
+ mk.getClusterInfo().renewLease(0);
+
+ // Should be 0
+ int pendingCount = mk.getPendingWriteCount();
+ LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent();
+
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
+ /** There should have been no updates **/
+ assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId()));
+ }
+
+ private void setupScenario() throws InterruptedException {
+ // add some nodes which won't be returned
+ mk.commit("/", "+\"u\" : { \"v\": {}}", null, null);
+ mk.commit("/u", "^\"v/key1\" : \"value1\"", null, null);
+
+ // move the time forward so that the root gets updated
+ clock.waitUntil(clock.getTime() + 5000);
+ mk.commit("/", "^\"a/key1\" : \"value1\"", null, null);
+ mk.backgroundWrite();
+
+ // move the time forward to have a new node under root
+ clock.waitUntil(clock.getTime() + 5000);
+ mk.commit("/", "+\"p\":{}", null, null);
+
+ // move the time forward to write all pending changes
+ clock.waitUntil(clock.getTime() + 5000);
+ mk.backgroundWrite();
+
+ // renew lease one last time
+ clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime());
+ mk.getClusterInfo().renewLease(0);
+
+ clock.waitUntil(clock.getTime() + 5000);
+ // add nodes won't trigger _lastRev updates
+ addNodes();
+ }
+
+ /**
+ * Should have the
+ */
+ private void addNodes() {
+ // change node /a/b/c by adding a property
+ mk.commit("/a/b", "^\"c/key1\" : \"value1\"", null, null);
+ // add node /a/b/c/d
+ mk.commit("/a/b/c", "+\"d\":{}", null, null);
+ // add node /a/b/f
+ mk.commit("/a/b", "+\"f\" : {}", null, null);
+ // add node /a/b/f/e
+ mk.commit("/a/b/f", "+\"e\": {}", null, null);
+ // change node /x/y/z
+ mk.commit("/x/y", "^\"z/key1\" : \"value1\"", null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Revision.setClock(null);
+ ClusterNodeInfo.setClock(null);
+ mk.dispose();
+ fixture.dispose();
+ }
+}
+
Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
------------------------------------------------------------------------------
svn:eol-style = native