You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2014/04/02 08:14:21 UTC
svn commit: r1583893 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
Author: chetanm
Date: Wed Apr 2 06:14:21 2014
New Revision: 1583893
URL: http://svn.apache.org/r1583893
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)
Refactored to use new methods
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java?rev=1583893&r1=1583892&r2=1583893&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java Wed Apr 2 06:14:21 2014
@@ -23,8 +23,6 @@ import static com.google.common.collect.
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.mergeSorted;
-import java.io.Closeable;
-import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
@@ -37,6 +35,7 @@ import com.google.common.collect.Maps;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,12 +71,12 @@ public class LastRevRecoveryAgent {
ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
if (nodeInfo != null) {
- Long leaseEnd = (Long) (nodeInfo.get(ClusterNodeInfo.LEASE_END_KEY));
+ long leaseEnd = nodeInfo.getLeaseEndTime();
// Check if _lastRev recovery needed for this cluster node
// state == null && recoveryLock not held by someone
- if (nodeInfo.get(ClusterNodeInfo.STATE) != null
- && nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_LOCK) == null) {
+ if (nodeInfo.isActive()
+ && !nodeInfo.isBeingRecovered()) {
// retrieve the root document's _lastRev
NodeDocument root = missingLastRevUtil.getRoot();
@@ -89,14 +88,15 @@ public class LastRevRecoveryAgent {
// Endtime is the leaseEnd + the asyncDelay
long endTime = leaseEnd + nodeStore.getAsyncDelay();
- log.info("Recovering candidates modified in time range : {0}",
- new Object[] {startTime, endTime});
+ log.info("Recovering candidates modified in time range : [{},{}] for clusterId [{}]",
+ Utils.timestampToString(startTime),
+ Utils.timestampToString(endTime), clusterId);
return recoverCandidates(clusterId, startTime, endTime);
}
}
- log.info("No recovery needed for clusterId");
+ log.debug("No recovery needed for clusterId {}", clusterId);
return 0;
}
@@ -193,27 +193,25 @@ public class LastRevRecoveryAgent {
* @return the int the number of restored nodes
*/
private int recoverCandidates(final int clusterId, final long startTime, final long endTime) {
- // take a lock on the update process by setting the value of the lock to true
- updateRecoveryLockOnCluster(clusterId, ClusterNodeInfo.REV_RECOVERY_ON);
+ boolean lockAcquired = missingLastRevUtil.acquireRecoveryLock(clusterId);
- Iterable<NodeDocument> suspects =
- missingLastRevUtil.getCandidates(startTime, endTime);
- if (log.isDebugEnabled()) {
- log.debug("_lastRev recovery candidates : {}", suspects);
+ //TODO What if recovery is being performed for current clusterNode by some other node
+ //should we halt the startup
+ if(!lockAcquired){
+ log.info("Last revision recovery already being performed by some other node. " +
+ "Would not attempt recovery");
+ return 0;
}
+ Iterable<NodeDocument> suspects = missingLastRevUtil.getCandidates(startTime, endTime);
+ log.debug("Performing Last Revision recovery for cluster {}", clusterId);
+
try {
return recover(suspects.iterator(), clusterId);
} finally {
- if (suspects instanceof Closeable) {
- try {
- ((Closeable) suspects).close();
- } catch (IOException e) {
- log.error("Error closing iterable : ", e);
- }
- }
+ Utils.closeIfCloseable(suspects);
// Relinquish the lock on the recovery for the cluster on the clusterInfo
- updateRecoveryLockOnCluster(clusterId, null);
+ missingLastRevUtil.releaseRecoveryLock(clusterId);
}
}
@@ -263,18 +261,6 @@ public class LastRevRecoveryAgent {
return null;
}
- /**
- * Set/Unset lock value on the clusterInfo for the clusterId
- *
- * @param clusterId for which _lastRev recovery operation performed
- * @param value the lock value
- */
- protected void updateRecoveryLockOnCluster(int clusterId, String value) {
- UpdateOp update = new UpdateOp("" + clusterId, true);
- update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, value);
- nodeStore.getDocumentStore().createOrUpdate(Collection.CLUSTER_NODES, update);
- }
-
private static class ClusterPredicate implements Predicate<Revision> {
private final int clusterId;